Skip to content

Commit a7698f6

Browse files
author
Michal Tichák
committed
[core] OCTRL-989
- removed `vars` from Ev_RunEvent - changing status and state of Task in kafka message only if event contains them - properly filling `traits` in Ev_TaskEvent from task.Traits
1 parent 54935c1 commit a7698f6

File tree

4 files changed

+25
-26
lines changed

4 files changed

+25
-26
lines changed

common/protos/events.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ message Ev_TaskEvent {
7878
string name = 1; // task name, based on the name of the task class
7979
string taskid = 2; // task id, unique
8080
string state = 3; // state machine state for this task
81-
string status = 4; // active/inactive etc.
81+
string status = 4; // posible values: ACTIVE/INACTIVE/PARTIAL/UNDEFINED/UNDEPLOYABLE as defined in status.go.
8282
string hostname = 5;
8383
string className = 6; // name of the task class from which this task was spawned
8484
Traits traits = 7;
@@ -99,7 +99,7 @@ message Ev_CallEvent {
9999

100100
message Ev_RoleEvent {
101101
string name = 1; // role name
102-
string status = 2; // active/inactive etc., derived from the state of child tasks, calls or other roles
102+
string status = 2; // posible values: ACTIVE/INACTIVE/PARTIAL/UNDEFINED/UNDEPLOYABLE as defined in status.go. Derived from the state of child tasks, calls or other roles
103103
string state = 3; // state machine state for this role
104104
string rolePath = 4; // path to this role within the environment
105105
string environmentId = 5;
@@ -123,7 +123,7 @@ message Ev_RunEvent {
123123
string error = 4;
124124
string transition = 5;
125125
OpStatus transitionStatus = 6;
126-
map<string, string> vars = 7;
126+
reserved 7; // 7 was used for `vars` field that was removed
127127
common.User lastRequestUser = 8;
128128
}
129129

core/environment/environment.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
160160
},
161161
fsm.Callbacks{
162162
"before_event": func(_ context.Context, e *fsm.Event) {
163-
164163
env.Mu.Lock()
165164
env.currentTransition = e.Event
166165
env.Mu.Unlock()
@@ -224,7 +223,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
224223
Error: "",
225224
Transition: e.Event,
226225
TransitionStatus: pb.OpStatus_STARTED,
227-
Vars: nil,
228226
LastRequestUser: env.GetLastRequestUser(),
229227
}, runStartTime)
230228

@@ -265,7 +263,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
265263
Error: "",
266264
Transition: e.Event,
267265
TransitionStatus: pb.OpStatus_STARTED,
268-
Vars: nil,
269266
LastRequestUser: env.GetLastRequestUser(),
270267
}, runEndTime)
271268

@@ -287,7 +284,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
287284
Error: "",
288285
Transition: e.Event,
289286
TransitionStatus: pb.OpStatus_STARTED,
290-
Vars: nil,
291287
LastRequestUser: env.GetLastRequestUser(),
292288
}, runEndTime)
293289

@@ -552,7 +548,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
552548
Error: "",
553549
Transition: e.Event,
554550
TransitionStatus: pb.OpStatus_DONE_OK,
555-
Vars: nil,
556551
LastRequestUser: env.GetLastRequestUser(),
557552
}
558553
if e.Err != nil {
@@ -574,7 +569,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
574569
Error: "",
575570
Transition: e.Event,
576571
TransitionStatus: pb.OpStatus_DONE_OK,
577-
Vars: nil,
578572
LastRequestUser: env.GetLastRequestUser(),
579573
}
580574
if e.Err != nil {
@@ -600,7 +594,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
600594
Error: "",
601595
Transition: e.Event,
602596
TransitionStatus: pb.OpStatus_DONE_OK,
603-
Vars: nil,
604597
LastRequestUser: env.GetLastRequestUser(),
605598
}, runEndCompletionTime)
606599

@@ -648,7 +641,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
648641
}
649642

650643
func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weightPredicate func(callable.HookWeight) bool) (err error) {
651-
652644
// Starting point: get all hooks to be started for the current trigger
653645
hooksMapForTrigger := workflow.GetHooksMapForTrigger(trigger)
654646
callsMapForAwait := env.callsPendingAwait[trigger]

core/environment/manager.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ type Manager struct {
6161
pendingStateChangeCh map[uid.ID]chan *event.TasksStateChangedEvent
6262
}
6363

64-
var (
65-
instance *Manager
66-
)
64+
var instance *Manager
6765

6866
func ManagerInstance() *Manager {
6967
return instance
@@ -145,7 +143,7 @@ func NewEnvManager(tm *task.Manager, incomingEventCh chan event.Event) *Manager
145143
// If there is no pending environment teardown, it means that the released task stopped
146144
// unexpectedly. In that case, the environment should get torn-down only if the task
147145
// is critical.
148-
var releaseCriticalTask = false
146+
releaseCriticalTask := false
149147
for _, v := range typedEvent.GetTaskIds() {
150148
if tm.GetTask(v) != nil {
151149
if tm.GetTask(v).GetTraits().Critical == true {
@@ -407,7 +405,7 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
407405

408406
err = env.TryTransition(NewDeployTransition(
409407
envs.taskman,
410-
nil, //roles,
408+
nil, // roles,
411409
nil),
412410
)
413411

@@ -647,7 +645,6 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
647645
Error: "",
648646
Transition: "TEARDOWN",
649647
TransitionStatus: evpb.OpStatus_STARTED,
650-
Vars: nil,
651648
}, runEndTime)
652649
} else {
653650
log.WithField("partition", environmentId.String()).
@@ -667,7 +664,6 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
667664
Error: "",
668665
Transition: "TEARDOWN",
669666
TransitionStatus: evpb.OpStatus_STARTED,
670-
Vars: nil,
671667
}, runEndCompletionTime)
672668
} else {
673669
log.WithField("partition", environmentId.String()).
@@ -1172,7 +1168,6 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
11721168

11731169
// FIXME: this function should be deduplicated with CreateEnvironment so detector resource matching works correctly
11741170
func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[string]string, newId uid.ID, sub Subscription) {
1175-
11761171
envUserVars := make(map[string]string)
11771172
workflowUserVars := make(map[string]string)
11781173
for k, v := range userVars {
@@ -1296,7 +1291,7 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str
12961291

12971292
err = env.TryTransition(NewDeployTransition(
12981293
envs.taskman,
1299-
nil, //roles,
1294+
nil, // roles,
13001295
nil),
13011296
)
13021297
if err == nil {

core/task/task.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ import (
5454
"github.com/AliceO2Group/Control/core/task/taskclass"
5555
"github.com/AliceO2Group/Control/core/task/taskclass/port"
5656
"github.com/AliceO2Group/Control/core/the"
57-
"github.com/mesos/mesos-go/api/v1/lib"
57+
mesos "github.com/mesos/mesos-go/api/v1/lib"
5858
"github.com/sirupsen/logrus"
5959
"github.com/spf13/viper"
6060
)
@@ -112,7 +112,7 @@ type Task struct {
112112
mu sync.RWMutex
113113
parent parentRole
114114
className string
115-
//configuration Descriptor
115+
// configuration Descriptor
116116
name string
117117
hostname string
118118
agentId string
@@ -501,6 +501,15 @@ func (t *Task) GetEnvironmentId() uid.ID {
501501
return t.parent.GetEnvironmentId()
502502
}
503503

504+
func traitsToPbTraits(traits Traits) *evpb.Traits {
505+
return &evpb.Traits{
506+
Trigger: traits.Trigger,
507+
Await: traits.Await,
508+
Timeout: traits.Timeout,
509+
Critical: traits.Critical,
510+
}
511+
}
512+
504513
func (t *Task) SendEvent(ev event.Event) {
505514
if t == nil {
506515
return
@@ -516,6 +525,7 @@ func (t *Task) SendEvent(ev event.Event) {
516525
Hostname: t.hostname,
517526
ClassName: t.className,
518527
Path: t.getParentRolePath(),
528+
Traits: traitsToPbTraits(t.GetTraits()),
519529
}
520530

521531
if t.parent == nil {
@@ -527,8 +537,12 @@ func (t *Task) SendEvent(ev event.Event) {
527537

528538
taskEvent, ok := ev.(*event.TaskEvent)
529539
if ok {
530-
outgoingEvent.State = taskEvent.State
531-
outgoingEvent.Status = taskEvent.Status
540+
if len(taskEvent.State) != 0 {
541+
outgoingEvent.State = taskEvent.State
542+
}
543+
if len(taskEvent.Status) != 0 {
544+
outgoingEvent.Status = taskEvent.Status
545+
}
532546
}
533547
the.EventWriterWithTopic(topic.Task).WriteEvent(outgoingEvent)
534548

@@ -658,7 +672,6 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
658672
// __ptree__:<syntax>:<key> with the plain payload.
659673
keysToDelete := make([]string, 0)
660674
for k, v := range propMap {
661-
662675
if strings.HasPrefix(v, "__ptree__:") {
663676
keysToDelete = append(keysToDelete, k, v)
664677
splitValue := strings.Split(v, ":")
@@ -671,7 +684,6 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
671684
delete(propMap, k)
672685
}
673686
}
674-
675687
}
676688
return propMap, err
677689
}

0 commit comments

Comments
 (0)