Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions pkg/datastore/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package datastore

import (
"context"
"fmt"

"github.com/sdcio/data-server/pkg/pool"
"github.com/sdcio/data-server/pkg/tree"
"github.com/sdcio/data-server/pkg/tree/importer"
treetypes "github.com/sdcio/data-server/pkg/tree/types"
Expand All @@ -17,21 +19,67 @@ func (d *Datastore) ApplyToRunning(ctx context.Context, deletes []*sdcpb.Path, i

d.syncTreeMutex.Lock()
defer d.syncTreeMutex.Unlock()

// create a virtual task pool for delete operations
deleteMarkerPool := d.taskPool.NewVirtualPool(pool.VirtualFailFast, 1)
for _, delete := range deletes {
err := d.syncTree.DeleteBranch(ctx, delete, tree.RunningIntentName)
// navigate to delete path
deleteRoot, err := d.syncTree.NavigateSdcpbPath(ctx, delete)
if err != nil {
log.Error(err, "failed deleting path from datastore sync tree", "severity", "WARN", "path", delete.ToXPath(false))
log.Error(err, "failed navigating to delete path", "path", delete.ToXPath(false))
continue
}
// apply delete marker, setting owner delete flag on running intent
err = tree.NewOwnerDeleteMarker(tree.NewOwnerDeleteMarkerTaskConfig(tree.RunningIntentName, false)).Run(deleteRoot, deleteMarkerPool)
if err != nil {
log.Error(err, "failed applying delete to path", "path", delete.ToXPath(false))
continue
}
}
// close the delete marker pool for submission
deleteMarkerPool.CloseForSubmit()
deleteMarkerPool.Wait()
err := deleteMarkerPool.FirstError()
if err != nil {
return err
}

// import new config if provided
if importer != nil {
err := d.syncTree.ImportConfig(ctx, &sdcpb.Path{}, importer, tree.RunningIntentName, tree.RunningValuesPrio, treetypes.NewUpdateInsertFlags())
if err != nil {
return err
}
}

// create a virtual task pool for remove deleted operations
removeDeletedPool := d.taskPool.NewVirtualPool(pool.VirtualFailFast, 1)

// run remove deleted processor to clean up entries marked as deleted by owner
delProcessorParams := tree.NewRemoveDeletedProcessorParameters(tree.RunningIntentName)
err = tree.NewRemoveDeletedProcessor(delProcessorParams).Run(d.syncTree.GetRoot(), removeDeletedPool)
if err != nil {
return err
}

// close the remove deleted pool for submission
removeDeletedPool.CloseForSubmit()
removeDeletedPool.Wait()
err = removeDeletedPool.FirstError()
if err != nil {
return err
}

// delete entries that have zero-length leaf variant entries after remove deleted processing
for _, e := range delProcessorParams.GetZeroLengthLeafVariantEntries() {
fmt.Println("entry has zero-length leaf variant entries after remove deleted", "entry", e.SdcpbPath().ToXPath(false))

err := e.GetParent().DeleteBranch(ctx, &sdcpb.Path{Elem: []*sdcpb.PathElem{sdcpb.NewPathElem(e.PathName(), nil)}}, tree.RunningIntentName)
if err != nil {
return err
}
}

// conditional trace logging
if log := log.V(logger.VTrace); log.Enabled() {
treeExport, err := d.syncTree.TreeExport(tree.RunningIntentName, tree.RunningValuesPrio, false)
Expand Down
Loading
Loading