Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions skipmaskmanager/skip_mask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,13 @@ func New(
func (smm *SkipMaskManager) Start(fracs fractionAcquirer) {
smm.createDataDir()

err := smm.loadSkipMasks()
allFracs := fracs.Fractions()
err := smm.loadSkipMasks(allFracs.Names())
if err != nil {
logger.Fatal("failed to load previous skip masks", zap.Error(err))
}

err = smm.buildQueue(fracs.Fractions())
err = smm.buildQueue(allFracs)
if err != nil {
logger.Fatal("failed to build skip mask manager queue", zap.Error(err))
}
Expand Down Expand Up @@ -288,7 +289,6 @@ func (smm *SkipMaskManager) RefreshFrac(fraction frac.Fraction) {
// This should be called when a fraction is deleted from the system.
// The removal is performed asynchronously in the background.
func (smm *SkipMaskManager) RemoveFrac(fracName string) {
// TODO: we might want to have some kind of GC on startup to clean up missed files
smm.bgWG.Go(func() {
smm.fracsMu.RLock()
fracsFiles, has := smm.fracs[fracName]
Expand Down Expand Up @@ -340,12 +340,17 @@ func (smm *SkipMaskManager) addDoneFrac(fracName, fracPath string) {
// - Registers completed skip masks (.skipmask files)
//
// This allows the manager to resume processing after a restart.
func (smm *SkipMaskManager) loadSkipMasks() error {
func (smm *SkipMaskManager) loadSkipMasks(fracNames []string) error {
des, err := os.ReadDir(smm.config.DataDir)
if err != nil {
return err
}

fracNamesSet := make(map[string]struct{}, len(fracNames))
for _, fracName := range fracNames {
fracNamesSet[fracName] = struct{}{}
}

var anyRemove bool

for _, de := range des {
Expand All @@ -354,7 +359,7 @@ func (smm *SkipMaskManager) loadSkipMasks() error {
}

if _, ok := smm.skipMasks[de.Name()]; !ok {
logger.Info("there is skip mask folder on disk, but not in config. need to delete it.")
logger.Info("found skip mask folder on disk, but not in config. remove it", zap.String("path", de.Name()))
err := os.RemoveAll(path.Join(smm.config.DataDir, de.Name()))
if err != nil && !os.IsNotExist(err) {
return err
Expand All @@ -378,13 +383,26 @@ func (smm *SkipMaskManager) loadSkipMasks() error {
if smde.IsDir() {
continue
}

name := smde.Name()
fracName := fracNameFromFilePath(name)

// remove missed files
if _, ok := fracNamesSet[fracName]; !ok {
err := os.RemoveAll(path.Join(sm.dirPath, name))
if err != nil && !os.IsNotExist(err) {
return err
}
logger.Info("found missed skip mask file. remove it", zap.String("path", name))
anyRemove = true
continue
}

switch path.Ext(name) {
case fracInQueueExt:
hasFracsInQueue = true
case fracDoneExt:
smm.addDoneFrac(fracNameFromFilePath(name), path.Join(sm.dirPath, name))
smm.addDoneFrac(fracName, path.Join(sm.dirPath, name))
}
}

Expand Down
Loading