-
Notifications
You must be signed in to change notification settings - Fork 380
feat(swip25): pull syncing optimization #5194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
98b5f12
792b424
dcb4797
e8ab112
fce1ee0
3a57f37
cc530ac
65e6c4d
d32cc32
f91d049
2e5275f
22fc8d9
5bbcd9a
2f47a58
3f59c8e
5486ef0
82cb9ad
bd7cb9a
ee65186
237c510
07ea327
ea8609f
1b4a6f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,8 +40,6 @@ const ( | |
| recalcPeersDur = time.Minute * 5 | ||
|
|
||
| maxChunksPerSecond = 1000 // roughly 4 MB/s | ||
|
|
||
| maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing. | ||
| ) | ||
|
|
||
| type Options struct { | ||
|
|
@@ -134,7 +132,8 @@ func (p *Puller) manage(ctx context.Context) { | |
|
|
||
| var prevRadius uint8 | ||
|
|
||
| onChange := func() { | ||
| // change in radius or in neighborhood peerset | ||
| changeCheck := func() { | ||
| p.syncPeersMtx.Lock() | ||
| defer p.syncPeersMtx.Unlock() | ||
|
|
||
|
|
@@ -152,32 +151,52 @@ func (p *Puller) manage(ctx context.Context) { | |
| } | ||
| p.logger.Debug("radius decrease", "old_radius", prevRadius, "new_radius", newRadius) | ||
| } | ||
| changed := newRadius != prevRadius | ||
| prevRadius = newRadius | ||
|
|
||
| // peersDisconnected is used to mark and prune peers that are no longer connected. | ||
| peersDisconnected := maps.Clone(p.syncPeers) | ||
|
|
||
| // make pullsync binary tree of neighbors by their address | ||
| bt := newPeerTreeNode(nil, nil, newRadius) | ||
| _ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { | ||
| if _, ok := p.syncPeers[addr.ByteString()]; !ok { | ||
| p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po) | ||
| syncPeer, ok := p.syncPeers[addr.ByteString()] | ||
| if !ok { | ||
| syncPeer = newSyncPeer(addr, p.bins, po) | ||
| p.syncPeers[addr.ByteString()] = syncPeer | ||
| if po >= newRadius { | ||
| changed = true | ||
| } | ||
| } else { | ||
| syncPeer.syncBins = make([]bool, p.bins) | ||
| } | ||
| if po >= newRadius { | ||
| _ = bt.Put(addr.Bytes(), &peerTreeNodeValue{SyncBins: syncPeer.syncBins}) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we do not have also here
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because at this point the pullsync parameters may have not changed to reinit syncings but you want to build up the tree with the neighbor peers in case there was a change in the neighborhood peerset eventually. |
||
| } | ||
| delete(peersDisconnected, addr.ByteString()) | ||
| return false, false, nil | ||
| }, topology.Select{}) | ||
|
|
||
| for _, peer := range peersDisconnected { | ||
| // if prevRadius was smaller, we need to take out it from the tree | ||
| if peer.po >= min(newRadius, prevRadius) { | ||
| changed = true | ||
| } | ||
| p.disconnectPeer(peer.address) | ||
| } | ||
|
|
||
| p.recalcPeers(ctx, newRadius) | ||
| // assign bins to each peer for syncing if peerset or radius changed | ||
| if changed { | ||
| _ = bt.BinAssignment() | ||
| p.recalcPeers(ctx) | ||
| } | ||
| } | ||
|
|
||
| tick := time.NewTicker(recalcPeersDur) | ||
| defer tick.Stop() | ||
|
|
||
| for { | ||
|
|
||
| onChange() | ||
| changeCheck() | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
|
|
@@ -204,26 +223,34 @@ func (p *Puller) disconnectPeer(addr swarm.Address) { | |
|
|
||
| // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. | ||
| // Must be called under lock. | ||
| func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { | ||
| func (p *Puller) recalcPeers(ctx context.Context) { | ||
| var wg sync.WaitGroup | ||
| for _, peer := range p.syncPeers { | ||
| wg.Add(1) | ||
| p.wg.Add(1) | ||
| go func(peer *syncPeer) { | ||
| defer p.wg.Done() | ||
| defer wg.Done() | ||
| if err := p.syncPeer(ctx, peer, storageRadius); err != nil { | ||
| if err := p.syncPeer(ctx, peer); err != nil { | ||
| p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err) | ||
| } | ||
| }(peer) | ||
| } | ||
| wg.Wait() | ||
| } | ||
|
|
||
| func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error { | ||
| func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer) error { | ||
| peer.mtx.Lock() | ||
| defer peer.mtx.Unlock() | ||
|
|
||
| if peer.po < p.radius.StorageRadius() { // no bin is assigned for syncing | ||
| peer.stop() | ||
| return nil | ||
| } | ||
| // If the peer's epoch has changed (indicating a reserve reset or storage change on the peer): | ||
| // - Cancel all ongoing bin syncs for this peer. | ||
| // - Reset all previously synced intervals for this peer (to force a fresh sync). | ||
| // This guarantees that sync state is consistent with the peer's current reserve, and avoids pulling stale or irrelevant data. | ||
| if peer.cursors == nil { | ||
| cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address) | ||
| if err != nil { | ||
|
|
@@ -258,40 +285,20 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin | |
| } | ||
|
|
||
| /* | ||
| The syncing behavior diverges for peers outside and within the storage radius. | ||
| For neighbor peers, we sync ALL bins greater than or equal to the storage radius. | ||
| For peers with PO lower than the storage radius, we must sync ONLY the bin that is the PO. | ||
| For peers peer with PO lower than the storage radius and even lower than the allowed minimum threshold, | ||
| no syncing is done. | ||
| All chunks above storage radius need to be syncronized only once. | ||
| For that, the puller must be aware of how neighbors cover the chunks by their bins together. | ||
| Based on that information, the puller will assign bins to each peer for sync. | ||
| So this point all bins need to be known for a peer for syncing. | ||
| */ | ||
|
|
||
| if peer.po >= storageRadius { | ||
|
|
||
| // cancel all bins lower than the storage radius | ||
| for bin := uint8(0); bin < storageRadius; bin++ { | ||
| peer.cancelBin(bin) | ||
| } | ||
|
|
||
| // sync all bins >= storage radius | ||
| for bin, cur := range peer.cursors { | ||
| if bin >= int(storageRadius) && !peer.isBinSyncing(uint8(bin)) { | ||
| p.syncPeerBin(ctx, peer, uint8(bin), cur) | ||
| } | ||
| } | ||
|
|
||
| } else if storageRadius-peer.po <= maxPODelta { | ||
| // cancel all non-po bins, if any | ||
| for bin := uint8(0); bin < p.bins; bin++ { | ||
| if bin != peer.po { | ||
| peer.cancelBin(bin) | ||
| for bin, forSync := range peer.syncBins { | ||
| if forSync { | ||
| if !peer.isBinSyncing(uint8(bin)) { | ||
| p.syncPeerBin(ctx, peer, uint8(bin), peer.cursors[bin]) | ||
| } | ||
| } else { | ||
| peer.cancelBin(uint8(bin)) | ||
| } | ||
| // sync PO bin only | ||
| if !peer.isBinSyncing(peer.po) { | ||
| p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po]) | ||
| } | ||
| } else { | ||
| peer.stop() | ||
| } | ||
|
|
||
| return nil | ||
|
|
@@ -540,7 +547,8 @@ type syncPeer struct { | |
| address swarm.Address | ||
| binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin | ||
| po uint8 | ||
| cursors []uint64 | ||
| syncBins []bool // index is bin, value is syncing or not. will be set during folding in the puller neighborhood tree | ||
| cursors []uint64 // index is bin, value is cursor | ||
|
|
||
| mtx sync.Mutex | ||
| wg sync.WaitGroup | ||
|
|
@@ -551,9 +559,14 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { | |
| address: addr, | ||
| binCancelFuncs: make(map[uint8]func(), bins), | ||
| po: po, | ||
| syncBins: make([]bool, bins), | ||
| } | ||
| } | ||
|
|
||
| func (p *syncPeer) GetSyncBins() []bool { | ||
| return p.syncBins | ||
| } | ||
|
|
||
| // called when peer disconnects or on shutdown, cleans up ongoing sync operations | ||
| func (p *syncPeer) stop() { | ||
| for bin, c := range p.binCancelFuncs { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rename please this to something like
radiusChangedorrChanged. It would be easier to figure out in the code what is this boolean all about.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is related to that the parameters for the pulling have changed that partially includes the radius change but also when a new peer is conencted to the node and that is a neighbor or a neighbor node is disconnected.