Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions pkg/puller/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,34 @@ package puller

import "github.com/ethersphere/bee/v2/pkg/swarm"

var PeerIntervalKey = peerIntervalKey
type (
PeerTreeNodeValue = peerTreeNodeValue
)

var (
PeerIntervalKey = peerIntervalKey
NewPeerTreeNode = newPeerTreeNode
)

// NewTreeNode is a wrapper for the generic newTreeNode function for testing
func NewTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] {
return newTreeNode(key, p, level)
}

// IsSyncing returns true if any of the bins is syncing
func (p *Puller) IsSyncing(addr swarm.Address) bool {
p.syncPeersMtx.Lock()
defer p.syncPeersMtx.Unlock()
_, ok := p.syncPeers[addr.ByteString()]
return ok
peer, ok := p.syncPeers[addr.ByteString()]
if !ok {
return false
}
for bin := range p.bins {
if peer.isBinSyncing(bin) {
return true
}
}
return false
}

func (p *Puller) IsBinSyncing(addr swarm.Address, bin uint8) bool {
Expand Down
97 changes: 55 additions & 42 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ const (
recalcPeersDur = time.Minute * 5

maxChunksPerSecond = 1000 // roughly 4 MB/s

maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing.
)

type Options struct {
Expand Down Expand Up @@ -134,7 +132,8 @@ func (p *Puller) manage(ctx context.Context) {

var prevRadius uint8

onChange := func() {
// change in radius or in neighborhood peerset
changeCheck := func() {
p.syncPeersMtx.Lock()
defer p.syncPeersMtx.Unlock()

Expand All @@ -152,32 +151,52 @@ func (p *Puller) manage(ctx context.Context) {
}
p.logger.Debug("radius decrease", "old_radius", prevRadius, "new_radius", newRadius)
}
changed := newRadius != prevRadius
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename please this to something like radiusChanged or rChanged. It would be easier to figure out in the code what is this boolean all about.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is related to that the parameters for the pulling have changed that partially includes the radius change but also when a new peer is conencted to the node and that is a neighbor or a neighbor node is disconnected.

prevRadius = newRadius

// peersDisconnected is used to mark and prune peers that are no longer connected.
peersDisconnected := maps.Clone(p.syncPeers)

// make pullsync binary tree of neighbors by their address
bt := newPeerTreeNode(nil, nil, newRadius)
_ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) {
if _, ok := p.syncPeers[addr.ByteString()]; !ok {
p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po)
syncPeer, ok := p.syncPeers[addr.ByteString()]
if !ok {
syncPeer = newSyncPeer(addr, p.bins, po)
p.syncPeers[addr.ByteString()] = syncPeer
if po >= newRadius {
changed = true
}
} else {
syncPeer.syncBins = make([]bool, p.bins)
}
if po >= newRadius {
_ = bt.Put(addr.Bytes(), &peerTreeNodeValue{SyncBins: syncPeer.syncBins})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we do not have also here changed = true ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because at this point the pullsync parameters may have not changed to reinit syncings but you want to build up the tree with the neighbor peers in case there was a change in the neighborhood peerset eventually.

}
delete(peersDisconnected, addr.ByteString())
return false, false, nil
}, topology.Select{})

for _, peer := range peersDisconnected {
// if prevRadius was smaller, we need to take out it from the tree
if peer.po >= min(newRadius, prevRadius) {
changed = true
}
p.disconnectPeer(peer.address)
}

p.recalcPeers(ctx, newRadius)
// assign bins to each peer for syncing if peerset or radius changed
if changed {
_ = bt.BinAssignment()
p.recalcPeers(ctx)
}
}

tick := time.NewTicker(recalcPeersDur)
defer tick.Stop()

for {

onChange()
changeCheck()

select {
case <-ctx.Done():
Expand All @@ -204,26 +223,34 @@ func (p *Puller) disconnectPeer(addr swarm.Address) {

// recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius.
// Must be called under lock.
func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) {
func (p *Puller) recalcPeers(ctx context.Context) {
var wg sync.WaitGroup
for _, peer := range p.syncPeers {
wg.Add(1)
p.wg.Add(1)
go func(peer *syncPeer) {
defer p.wg.Done()
defer wg.Done()
if err := p.syncPeer(ctx, peer, storageRadius); err != nil {
if err := p.syncPeer(ctx, peer); err != nil {
p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err)
}
}(peer)
}
wg.Wait()
}

func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error {
func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer) error {
peer.mtx.Lock()
defer peer.mtx.Unlock()

if peer.po < p.radius.StorageRadius() { // no bin is assigned for syncing
peer.stop()
return nil
}
// If the peer's epoch has changed (indicating a reserve reset or storage change on the peer):
// - Cancel all ongoing bin syncs for this peer.
// - Reset all previously synced intervals for this peer (to force a fresh sync).
// This guarantees that sync state is consistent with the peer's current reserve, and avoids pulling stale or irrelevant data.
if peer.cursors == nil {
cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address)
if err != nil {
Expand Down Expand Up @@ -258,40 +285,20 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin
}

/*
The syncing behavior diverges for peers outside and within the storage radius.
For neighbor peers, we sync ALL bins greater than or equal to the storage radius.
For peers with PO lower than the storage radius, we must sync ONLY the bin that is the PO.
For peers peer with PO lower than the storage radius and even lower than the allowed minimum threshold,
no syncing is done.
All chunks above storage radius need to be syncronized only once.
For that, the puller must be aware of how neighbors cover the chunks by their bins together.
Based on that information, the puller will assign bins to each peer for sync.
So this point all bins need to be known for a peer for syncing.
*/

if peer.po >= storageRadius {

// cancel all bins lower than the storage radius
for bin := uint8(0); bin < storageRadius; bin++ {
peer.cancelBin(bin)
}

// sync all bins >= storage radius
for bin, cur := range peer.cursors {
if bin >= int(storageRadius) && !peer.isBinSyncing(uint8(bin)) {
p.syncPeerBin(ctx, peer, uint8(bin), cur)
}
}

} else if storageRadius-peer.po <= maxPODelta {
// cancel all non-po bins, if any
for bin := uint8(0); bin < p.bins; bin++ {
if bin != peer.po {
peer.cancelBin(bin)
for bin, forSync := range peer.syncBins {
if forSync {
if !peer.isBinSyncing(uint8(bin)) {
p.syncPeerBin(ctx, peer, uint8(bin), peer.cursors[bin])
}
} else {
peer.cancelBin(uint8(bin))
}
// sync PO bin only
if !peer.isBinSyncing(peer.po) {
p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po])
}
} else {
peer.stop()
}

return nil
Expand Down Expand Up @@ -540,7 +547,8 @@ type syncPeer struct {
address swarm.Address
binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin
po uint8
cursors []uint64
syncBins []bool // index is bin, value is syncing or not. will be set during folding in the puller neighborhood tree
cursors []uint64 // index is bin, value is cursor

mtx sync.Mutex
wg sync.WaitGroup
Expand All @@ -551,9 +559,14 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer {
address: addr,
binCancelFuncs: make(map[uint8]func(), bins),
po: po,
syncBins: make([]bool, bins),
}
}

func (p *syncPeer) GetSyncBins() []bool {
return p.syncBins
}

// called when peer disconnects or on shutdown, cleans up ongoing sync operations
func (p *syncPeer) stop() {
for bin, c := range p.binCancelFuncs {
Expand Down
Loading
Loading