Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 80 additions & 37 deletions pkg/datastore/target/gnmi/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gnmi
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/openconfig/gnmi/proto/gnmi"
Expand All @@ -25,7 +25,8 @@ import (

const (
syncSignalBufferSize = 1
notificationSendTimeout = 5 * time.Second
defaultUpdChanBufSize = 20
defaultNotifSendTimeout = 5 * time.Second
notificationSlowSendWarnTime = 500 * time.Millisecond
)

Expand All @@ -37,6 +38,13 @@ type StreamSync struct {
runningStore types.RunningStore
schemaClient dsutils.SchemaClientBound
vpoolFactory pool.VirtualPoolFactory

// updChanBufSize controls the notification channel buffer. Default is
// defaultUpdChanBufSize; tests may override before calling Start.
updChanBufSize int
// notifSendTimeout is the per-notification delivery deadline. Default is
// defaultNotifSendTimeout; tests may override before calling Start.
notifSendTimeout time.Duration
}

func NewStreamSync(ctx context.Context, target SyncTarget, c *config.SyncProtocol, runningStore types.RunningStore, schemaClient dsutils.SchemaClientBound, vpoolFactory pool.VirtualPoolFactory) *StreamSync {
Expand All @@ -47,13 +55,15 @@ func NewStreamSync(ctx context.Context, target SyncTarget, c *config.SyncProtoco
ctx = logger.IntoContext(ctx, log)

return &StreamSync{
config: c,
target: target,
cancel: cancel,
runningStore: runningStore,
schemaClient: schemaClient,
ctx: ctx,
vpoolFactory: vpoolFactory,
config: c,
target: target,
cancel: cancel,
runningStore: runningStore,
schemaClient: schemaClient,
ctx: ctx,
vpoolFactory: vpoolFactory,
updChanBufSize: defaultUpdChanBufSize,
notifSendTimeout: defaultNotifSendTimeout,
}
}

Expand Down Expand Up @@ -110,7 +120,7 @@ func (s *StreamSync) Start() error {
log := logger.FromContext(s.ctx)
log.Info("Starting Sync")

updChan := make(chan *NotificationData, 20)
updChan := make(chan *NotificationData, s.updChanBufSize)

// Keep a single pending sync signal to avoid blocking the subscribe loop.
syncResponse := make(chan struct{}, syncSignalBufferSize)
Expand All @@ -128,22 +138,32 @@ func (s *StreamSync) Start() error {
return nil
}

// buildTreeSyncWithDatastore accumulates incoming notifications into a local
// syncTree and commits it to Running on SyncResponse and every ticker tick.
//
// The commit is handed off to a background goroutine so the main select loop
// never blocks on ApplyToRunning. This prevents notifications from timing out
// and being silently dropped while a slow performRevert holds the goroutine
// (see GitHub issue #440).
func (s *StreamSync) buildTreeSyncWithDatastore(cUS <-chan *NotificationData, syncResponse <-chan struct{}) {
log := logger.FromContext(s.ctx)
syncTree, err := s.runningStore.NewEmptyTree(s.ctx)
if err != nil {
log.Error(err, "failure creating new sync tree")
return
}
syncTreeMutex := &sync.Mutex{}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// disable ticker until after the initial full sync is done
tickerActive := false

uif := treetypes.NewUpdateInsertFlags()

// tickerChan starts as nil (receives from a nil channel block forever in
// Go, so the ticker case never fires). It is set to a real ticker after
// the initial SyncResponse so the ticker only runs post-initial-sync.
var tickerChan <-chan time.Time

// syncRunning is true while a syncToRunning goroutine is in flight.
// The ticker case skips when true; the syncResponse case always proceeds.
var syncRunning atomic.Bool

for {
select {
case <-s.ctx.Done():
Expand All @@ -159,21 +179,42 @@ func (s *StreamSync) buildTreeSyncWithDatastore(cUS <-chan *NotificationData, sy
}
syncTree.GetTreeContext().ExplicitDeletes().Add(consts.RunningIntentName, consts.RunningValuesPrio, noti.deletes)
case <-syncResponse:
syncTree, err = s.syncToRunning(syncTree, syncTreeMutex, true)
tickerActive = true
treeToCommit := syncTree
syncTree, err = s.runningStore.NewEmptyTree(s.ctx)
if err != nil {
log.Error(err, "failed committing synctree to running")
log.Error(err, "failure creating new sync tree after sync response")
return
}
if tickerChan == nil {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
tickerChan = t.C
}
case <-ticker.C:
if !tickerActive {
log.Info("Skipping a sync tick - initial sync not finished yet")
syncRunning.Store(true)
go func() {
defer syncRunning.Store(false)
if err := s.syncToRunning(treeToCommit, true); err != nil {
log.Error(err, "failed committing synctree to running")
}
}()
case <-tickerChan:
if syncRunning.Load() {
continue
}
log.Info("SyncRunning due to ticker")
syncTree, err = s.syncToRunning(syncTree, syncTreeMutex, true)
treeToCommit := syncTree
syncTree, err = s.runningStore.NewEmptyTree(s.ctx)
if err != nil {
log.Error(err, "failed committing synctree to running")
log.Error(err, "failure creating new sync tree after ticker")
return
}
syncRunning.Store(true)
go func() {
defer syncRunning.Store(false)
if err := s.syncToRunning(treeToCommit, true); err != nil {
log.Error(err, "failed committing synctree to running")
}
}()
}
}
}
Expand All @@ -186,8 +227,8 @@ func (s *StreamSync) gnmiSubscribe(subReq *gnmi.SubscribeRequest, updChan chan<-
respChan, errChan := s.target.Subscribe(s.ctx, subReq, s.config.Name)

taskPool := s.vpoolFactory.NewVirtualPool(pool.VirtualTolerant)
defer taskPool.CloseForSubmit()
taskParams := NewNotificationProcessorTaskParameters(updChan, s.schemaClient)
defer func() { taskPool.CloseForSubmit() }()
taskParams := NewNotificationProcessorTaskParameters(updChan, s.schemaClient, s.notifSendTimeout)

syncStartTime := time.Now()
for {
Expand Down Expand Up @@ -228,22 +269,22 @@ func (s *StreamSync) gnmiSubscribe(subReq *gnmi.SubscribeRequest, updChan chan<-
}
}

func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, m *sync.Mutex, logCount bool) (*tree.RootEntry, error) {
// syncToRunning exports syncTree and applies it to Running. It is called from
// background goroutines spawned by buildTreeSyncWithDatastore; the caller owns
// syncTree exclusively and no mutex is needed.
func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, logCount bool) error {
log := logger.FromContext(s.ctx)
m.Lock()
defer m.Unlock()

startTime := time.Now()
result, err := ops.TreeExport(syncTree.Entry, consts.RunningIntentName, consts.RunningValuesPrio, false)
log.V(logger.VTrace).Info("exported tree", "tree", result.String())
if err != nil {
if errors.Is(err, ops.ErrorIntentNotPresent) {
log.Info("sync no config changes")
// all good no data present
return syncTree, nil
return nil
}
log.Error(err, "sync tree export error")
return s.runningStore.NewEmptyTree(s.ctx)
return err
}
// extract the explicit deletes
deletes := result.ExplicitDeletes
Expand All @@ -259,10 +300,10 @@ func (s *StreamSync) syncToRunning(syncTree *tree.RootEntry, m *sync.Mutex, logC
err = s.runningStore.ApplyToRunning(s.ctx, deletes, proto.NewProtoTreeImporter(result))
if err != nil {
log.Error(err, "failed importing sync to running")
return s.runningStore.NewEmptyTree(s.ctx)
return err
}
log.V(logger.VTrace).Info("import to running tree done", "duration", time.Since(startTime).String())
return s.runningStore.NewEmptyTree(s.ctx)
return nil
}

type SyncTarget interface {
Expand All @@ -282,12 +323,14 @@ type notificationProcessorTask struct {
type NotificationProcessorTaskParameters struct {
notificationResult chan<- *NotificationData
schemaClientBound dsutils.SchemaClientBound
sendTimeout time.Duration
}

func NewNotificationProcessorTaskParameters(notificationResult chan<- *NotificationData, scb dsutils.SchemaClientBound) *NotificationProcessorTaskParameters {
func NewNotificationProcessorTaskParameters(notificationResult chan<- *NotificationData, scb dsutils.SchemaClientBound, sendTimeout time.Duration) *NotificationProcessorTaskParameters {
return &NotificationProcessorTaskParameters{
notificationResult: notificationResult,
schemaClientBound: scb,
sendTimeout: sendTimeout,
}
}

Expand All @@ -310,8 +353,8 @@ func (t *notificationProcessorTask) sendNotificationData(ctx context.Context, da
log.Info("slow notification delivery to sync loop", "duration", duration.String())
}
return nil
case <-time.After(notificationSendTimeout):
log.Error(context.DeadlineExceeded, "notification delivery timeout, dropping update", "timeout", notificationSendTimeout.String())
case <-time.After(t.params.sendTimeout):
log.Error(context.DeadlineExceeded, "notification delivery timeout, dropping update", "timeout", t.params.sendTimeout.String())
return nil
}
}
Expand Down
Loading
Loading