Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ func (d *Daemon) Run() error {
// Cleanup.
log.Println("daemon: shutting down")
d.listener.Close()
d.idleWatcher.StopAll()
if err := d.idleWatcher.StopAllAndWaitTimeout(5 * time.Second); err != nil {
log.Printf("daemon: %v", err)
}
cancel()
d.closeAllClients()
d.closeProviders()
Expand Down
35 changes: 34 additions & 1 deletion fetcher/idle.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fetcher

import (
"errors"
"log"
"strings"
"sync"
Expand All @@ -19,10 +20,14 @@ type IdleUpdate struct {
// IdleWatcher manages IDLE connections for multiple accounts.
type IdleWatcher struct {
mu sync.Mutex
wg sync.WaitGroup
watchers map[string]*accountIdle // key: account ID
notify chan<- IdleUpdate
}

// ErrStopTimeout is returned when IDLE watcher goroutines do not stop before the timeout.
var ErrStopTimeout = errors.New("idle watcher: stop timed out")

// accountIdle manages a single IDLE connection for one account.
type accountIdle struct {
account *config.Account
Expand Down Expand Up @@ -60,7 +65,11 @@ func (w *IdleWatcher) Watch(account *config.Account, folder string) {
done: make(chan struct{}),
}
w.watchers[account.ID] = a
go a.run()
w.wg.Add(1)
go func() {
defer w.wg.Done()
a.run()
}()
}

// Stop stops the IDLE watcher for a specific account.
Expand Down Expand Up @@ -100,6 +109,30 @@ func (w *IdleWatcher) StopAllAndWait() {
for _, done := range pending {
<-done
}
w.wg.Wait()
}

// StopAllAndWaitTimeout stops all IDLE watchers and waits for them to finish up to d.
func (w *IdleWatcher) StopAllAndWaitTimeout(d time.Duration) error {
w.mu.Lock()
for id, a := range w.watchers {
close(a.stop)
delete(w.watchers, id)
}
w.mu.Unlock()

done := make(chan struct{})
go func() {
w.wg.Wait()
close(done)
}()

select {
case <-done:
return nil
case <-time.After(d):
return ErrStopTimeout
}
}

func (a *accountIdle) run() {
Expand Down
72 changes: 72 additions & 0 deletions fetcher/idle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package fetcher

import (
"errors"
"sync/atomic"
"testing"
"time"

"github.com/floatpane/matcha/config"
)

func TestIdleWatcher_StopAllAndWait_TracksReplacedGoroutines(t *testing.T) {
w := NewIdleWatcher(make(chan IdleUpdate))
stopCh := make(chan struct{})
doneCh := make(chan struct{})
var exits atomic.Int64

w.wg.Add(1)
go func() {
defer w.wg.Done()
defer close(doneCh)
<-stopCh
exits.Add(1)
}()

w.watchers["acct"] = &accountIdle{
account: &config.Account{ID: "acct"},
stop: stopCh,
done: doneCh,
}

if err := w.StopAllAndWaitTimeout(time.Second); err != nil {
t.Fatalf("StopAllAndWaitTimeout returned error: %v", err)
}
if got := exits.Load(); got != 1 {
t.Fatalf("expected synthetic watcher to exit once, got %d", got)
}
}

func TestIdleWatcher_StopAllAndWaitTimeout_ReturnsOnSlowExit(t *testing.T) {
w := NewIdleWatcher(make(chan IdleUpdate))
stopCh := make(chan struct{})
doneCh := make(chan struct{})
release := make(chan struct{})
exited := make(chan struct{})

w.wg.Add(1)
go func() {
defer w.wg.Done()
defer close(doneCh)
defer close(exited)
<-release
}()

w.watchers["acct"] = &accountIdle{
account: &config.Account{ID: "acct"},
stop: stopCh,
done: doneCh,
}

err := w.StopAllAndWaitTimeout(50 * time.Millisecond)
if !errors.Is(err, ErrStopTimeout) {
t.Fatalf("expected ErrStopTimeout, got %v", err)
}

close(release)
select {
case <-exited:
case <-time.After(time.Second):
t.Fatal("synthetic watcher did not exit during cleanup")
}
}
Loading