Skip to content

Commit 3f4a6f7

Browse files
knopers8teo
authored andcommitted
[core] avoid stuck updateTaskStatus due to mulitple mesos updates
The issue described in OCTRL-953 occurs when we schedule kill of a task and while waiting for an acknowledgment, we receive two mesos updates: ``` [2024-11-11T12:14:02+01:00] TRACE scheduler: task status update received detector=TRG message= srcHost=alio2-cr1-flp163 state=TASK_FAILED task=2qwA9EYEgnY [2024-11-11T12:14:02+01:00] TRACE scheduler: task status update received detector=TRG message=Reconciliation: Task is unknown to the agent srcHost=alio2-cr1-flp163 state=TASK_LOST task=2qwA9EYEgnY ``` Which then trigger the discussed ack. Since it's inclear to me whether we can surely ignore TASK_LOST and trust that we will always receive either TASK_FAILED or TASK_FINISHED, I went for the approach of improving safeAcks to produce an error when subsequent acks are sent instead of blocking some goroutines.
1 parent 1dfb714 commit 3f4a6f7

File tree

5 files changed

+264
-97
lines changed

5 files changed

+264
-97
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT))
7070

7171
GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping
7272
SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut
73-
TEST_DIRS := ./apricot/local ./common/gera ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
73+
TEST_DIRS := ./apricot/local ./common/gera ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
7474
GO_TEST_DIRS := ./core/repos ./core/integration/dcs
7575

7676
coverage:COVERAGE_PREFIX := ./coverage_results

common/utils/safeacks/safeacks.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2020 CERN and copyright holders of ALICE O².
5+
* Author: Miltiadis Alexis <miltiadis.alexis@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package safeacks
26+
27+
import (
28+
"fmt"
29+
"sync"
30+
)
31+
32+
// SafeAcks is a thread safe structure which allows to handle acknowledgment exchanges
33+
// with N senders and one receiver. The first sender succeeds, then an error is returned for the subsequent ones.
34+
// This way, subsequent senders are not stuck sending an acknowledgment when nothing expects it anymore.
35+
// The signaling design is inspired by point 2 in https://go101.org/article/channel-closing.html
36+
// SafeAcks can be used to acknowledge that an action happened to the task such as task KILLED.
37+
// At the moment we utilize SafeAcks to acknowledge that all the requested tasks were killed by mesos (task/manager.go).
38+
type SafeAcks struct {
39+
mu sync.RWMutex
40+
acks map[string]ackChannels
41+
}
42+
43+
type ackChannels struct {
44+
// the channel to send the ack to
45+
ack chan struct{}
46+
// the channel to close when acks are no longer expected
47+
stop chan struct{}
48+
}
49+
50+
func (a *SafeAcks) deleteKey(key string) {
51+
a.mu.Lock()
52+
defer a.mu.Unlock()
53+
54+
delete(a.acks, key)
55+
}
56+
57+
func (a *SafeAcks) ExpectsAck(key string) bool {
58+
a.mu.RLock()
59+
defer a.mu.RUnlock()
60+
61+
_, ok := a.acks[key]
62+
63+
return ok
64+
}
65+
66+
func (a *SafeAcks) RegisterAck(key string) error {
67+
a.mu.Lock()
68+
defer a.mu.Unlock()
69+
70+
if _, hasKey := a.acks[key]; hasKey {
71+
return fmt.Errorf("an acknowledgment was already registered for key '%s'", key)
72+
}
73+
74+
a.acks[key] = ackChannels{make(chan struct{}), make(chan struct{})}
75+
return nil
76+
}
77+
78+
func (a *SafeAcks) getValue(key string) (ackChannels ackChannels, ok bool) {
79+
a.mu.Lock()
80+
defer a.mu.Unlock()
81+
82+
ackChannels, ok = a.acks[key]
83+
return
84+
}
85+
86+
// TrySendAck checks if an acknowledgment is expected and if it is, it blocks until it is received.
87+
// If an acknowledgment is not expected at the moment of the call (or already was received), nil is returned.
88+
// If more than one goroutine attempts to send an acknowledgment before it is received, all but one goroutines will
89+
// receive an error.
90+
func (a *SafeAcks) TrySendAck(key string) error {
91+
channels, ok := a.getValue(key)
92+
if !ok {
93+
// fixme: perhaps we should return an error also here, but returning nil preserves the original behaviour
94+
// of safeAcks before the refactoring. Perhaps the rest of the code assumes it's ok to blindly try sending
95+
// an ack "just in case", so I would not change it lightly.
96+
return nil
97+
}
98+
99+
select {
100+
case <-channels.stop:
101+
return fmt.Errorf("an acknowledgment has been already received for key '%s'", key)
102+
case channels.ack <- struct{}{}:
103+
return nil
104+
}
105+
}
106+
107+
// TryReceiveAck blocks until an acknowledgment is received and then returns true.
108+
// It will return false if an acknowledgment for a given key is not expected.
109+
func (a *SafeAcks) TryReceiveAck(key string) bool {
110+
channels, ok := a.getValue(key)
111+
if !ok {
112+
return false
113+
}
114+
<-channels.ack
115+
close(channels.stop)
116+
a.deleteKey(key)
117+
return true
118+
}
119+
120+
func NewAcks() *SafeAcks {
121+
return &SafeAcks{
122+
acks: make(map[string]ackChannels),
123+
}
124+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package safeacks
2+
3+
import (
4+
. "github.com/onsi/ginkgo/v2"
5+
. "github.com/onsi/gomega"
6+
"sync"
7+
"testing"
8+
"time"
9+
)
10+
11+
var _ = Describe("SafeAcks", func() {
12+
var sa *SafeAcks
13+
14+
BeforeEach(func() {
15+
sa = NewAcks()
16+
})
17+
18+
Describe("RegisterAck", func() {
19+
It("should register a new ack", func(ctx SpecContext) {
20+
err := sa.RegisterAck("test")
21+
Expect(err).NotTo(HaveOccurred())
22+
Expect(sa.ExpectsAck("test")).To(BeTrue())
23+
}, SpecTimeout(5*time.Second))
24+
25+
It("should return error when an ack is already registered", func(ctx SpecContext) {
26+
err := sa.RegisterAck("test")
27+
Expect(err).NotTo(HaveOccurred())
28+
Expect(sa.ExpectsAck("test")).To(BeTrue())
29+
30+
err = sa.RegisterAck("test")
31+
Expect(err).To(HaveOccurred())
32+
33+
Expect(sa.ExpectsAck("test")).To(BeTrue())
34+
}, SpecTimeout(5*time.Second))
35+
})
36+
// TODO add timeout for this test
37+
Describe("TrySendAck and TryReceiveAck", func() {
38+
It("should return nil for non-existent key", func(ctx SpecContext) {
39+
err := sa.TrySendAck("nonexistent")
40+
Expect(err).To(BeNil())
41+
}, SpecTimeout(5*time.Second))
42+
43+
It("should send ack successfully", func(ctx SpecContext) {
44+
err := sa.RegisterAck("test")
45+
Expect(err).NotTo(HaveOccurred())
46+
47+
var wg sync.WaitGroup
48+
wg.Add(1)
49+
50+
go func() {
51+
defer wg.Done()
52+
err := sa.TrySendAck("test")
53+
Expect(err).To(BeNil())
54+
}()
55+
Expect(sa.TryReceiveAck("test")).To(BeTrue())
56+
57+
wg.Wait()
58+
}, SpecTimeout(5*time.Second))
59+
60+
It("should return error when ack was already sent once", func(ctx SpecContext) {
61+
err := sa.RegisterAck("test")
62+
Expect(err).NotTo(HaveOccurred())
63+
64+
result1 := make(chan error)
65+
result2 := make(chan error)
66+
go func() {
67+
result1 <- sa.TrySendAck("test")
68+
}()
69+
70+
go func() {
71+
result2 <- sa.TrySendAck("test")
72+
}()
73+
74+
// I really don't like relying on a sleep call to test this, but I see no other way...
75+
// The goal is to have both `TrySendAck` blocked at channel send before invoking TryReceiveAck.
76+
// Hopefully 1 second is enough to avoid having a shaky test.
77+
time.Sleep(1000 * time.Millisecond)
78+
79+
ok := sa.TryReceiveAck("test")
80+
Expect(ok).To(BeTrue())
81+
82+
oneErrorHaveOccured := (<-result1 != nil) != (<-result2 != nil)
83+
Expect(oneErrorHaveOccured).To(BeTrue())
84+
}, SpecTimeout(5*time.Second))
85+
})
86+
87+
Describe("ExpectsAck", func() {
88+
It("should return false for non-existent key", func(ctx SpecContext) {
89+
Expect(sa.ExpectsAck("nonexistent")).To(BeFalse())
90+
}, SpecTimeout(5*time.Second))
91+
92+
It("should return true for registered key", func(ctx SpecContext) {
93+
err := sa.RegisterAck("test")
94+
Expect(err).NotTo(HaveOccurred())
95+
Expect(sa.ExpectsAck("test")).To(BeTrue())
96+
}, SpecTimeout(5*time.Second))
97+
98+
It("should not be permanently blocked by another call", func(ctx SpecContext) {
99+
err := sa.RegisterAck("test")
100+
Expect(err).NotTo(HaveOccurred())
101+
go func() {
102+
sa.TryReceiveAck("test")
103+
}()
104+
105+
// I really don't like relying on a sleep call to test this, but I see no other way...
106+
// The goal is to have `TryReceiveAck` blocked at channel receive before invoking ExpectsAck.
107+
// Hopefully 1 second is enough to avoid having a shaky test.
108+
time.Sleep(1000 * time.Millisecond)
109+
110+
Expect(sa.ExpectsAck("test")).To(BeTrue())
111+
}, SpecTimeout(5*time.Second))
112+
})
113+
})
114+
115+
func TestSafeAcks(t *testing.T) {
116+
RegisterFailHandler(Fail)
117+
RunSpecs(t, "Component SafeAcks Test Suite")
118+
}

core/task/manager.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"context"
2929
"errors"
3030
"fmt"
31+
"github.com/AliceO2Group/Control/common/utils/safeacks"
3132
"os"
3233
"strings"
3334
"sync"
@@ -99,7 +100,7 @@ type Manager struct {
99100

100101
schedulerState *schedulerState
101102
internalEventCh chan<- event.Event
102-
ackKilledTasks *safeAcks
103+
ackKilledTasks *safeacks.SafeAcks
103104
killTasksMu sync.Mutex // to avoid races when attempting to kill the same tasks in different goroutines
104105
}
105106

@@ -141,7 +142,7 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
141142
taskman.cq = taskman.schedulerState.commandqueue
142143
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
143144
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
144-
taskman.ackKilledTasks = newAcks()
145+
taskman.ackKilledTasks = safeacks.NewAcks()
145146

146147
schedState.setupCli()
147148

@@ -1009,11 +1010,17 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
10091010
WithField("partition", envId.String()).
10101011
Warn("attempted status update of task not in roster")
10111012
}
1012-
if ack, ok := m.ackKilledTasks.getValue(taskId); ok {
1013-
ack <- struct{}{}
1014-
// close(ack) // It can even be left open?
1013+
err := m.ackKilledTasks.TrySendAck(taskId)
1014+
if err != nil {
1015+
log.WithField("taskId", taskId).
1016+
WithField("mesosStatus", status.GetState().String()).
1017+
WithField("level", infologger.IL_Devel).
1018+
WithField("status", status.GetState().String()).
1019+
WithField("reason", status.GetReason().String()).
1020+
WithField("detector", detector).
1021+
WithField("partition", envId.String()).
1022+
Warnf("%s", err)
10151023
}
1016-
10171024
return
10181025
}
10191026

@@ -1064,7 +1071,7 @@ func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {
10641071
// If the task list includes locked tasks, TaskNotFoundError is returned.
10651072
func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) {
10661073
taskCanBeKilledFilter := func(t *Task) bool {
1067-
if t.IsLocked() || m.ackKilledTasks.contains(t.taskId) {
1074+
if t.IsLocked() || m.ackKilledTasks.ExpectsAck(t.taskId) {
10681075
return false
10691076
}
10701077
for _, id := range taskIds {
@@ -1090,17 +1097,19 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err
10901097
}
10911098

10921099
for _, id := range toKill.GetTaskIds() {
1093-
m.ackKilledTasks.addAckChannel(id)
1100+
err := m.ackKilledTasks.RegisterAck(id)
1101+
if err != nil {
1102+
log.WithField("level", infologger.IL_Devel).Warnf("failed to register ack for task '%s': %s", id, err)
1103+
}
10941104
}
10951105

10961106
killed, running, err = m.doKillTasks(toKill)
10971107
m.killTasksMu.Unlock()
10981108

10991109
for _, id := range killed.GetTaskIds() {
1100-
ack, ok := m.ackKilledTasks.getValue(id)
1101-
if ok {
1102-
<-ack
1103-
m.ackKilledTasks.deleteKey(id)
1110+
ok := m.ackKilledTasks.TryReceiveAck(id)
1111+
if !ok {
1112+
log.WithField("level", infologger.IL_Devel).Warnf("ack for task '%s' was never registered or was already received", id)
11041113
}
11051114
}
11061115
return

0 commit comments

Comments
 (0)