Skip to content

Commit c41ab1f

Browse files
committed
[core][executor] more details in logs related to handling roles
1 parent f4da87a commit c41ab1f

4 files changed

Lines changed: 38 additions & 9 deletions

File tree

core/workflow/aggregatorrole.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ func (r *aggregatorRole) updateStatus(s task.Status) {
238238
"child status": s.String(),
239239
"aggregator status": r.status.get().String(),
240240
"aggregator role": r.Name,
241+
"partition": r.GetEnvironmentId().String(),
241242
}).
242243
Trace("aggregator role about to merge incoming child status")
243244
r.status.merge(s, r)
@@ -252,8 +253,10 @@ func (r *aggregatorRole) updateState(s task.State) {
252253
if r == nil {
253254
return
254255
}
255-
log.WithField("role", r.Name).WithField("state", s.String()).Trace("updating state")
256256
r.state.merge(s, r)
257+
log.WithField("role", r.Name).
258+
WithField("partition", r.GetEnvironmentId().String()).
259+
Tracef("updated state to %s upon input state %s", r.state.get().String(), s.String())
257260
r.SendEvent(&event.RoleEvent{Name: r.Name, State: r.state.get().String(), RolePath: r.GetPath()})
258261
if r.parent != nil {
259262
r.parent.updateState(r.state.get())

core/workflow/callrole.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,9 @@ func (t *callRole) UpdateState(s task.State) {
208208

209209
func (t *callRole) updateStatus(s task.Status) {
210210
if t.parent == nil {
211-
log.WithField("status", s.String()).Error("cannot update status with nil parent")
211+
log.WithField("status", s.String()).
212+
WithField("partition", t.GetEnvironmentId().String()).
213+
Error("cannot update status with nil parent")
212214
}
213215
t.status.merge(s, t)
214216
t.SendEvent(&event.RoleEvent{Name: t.Name, Status: t.status.get().String(), RolePath: t.GetPath()})
@@ -217,10 +219,14 @@ func (t *callRole) updateStatus(s task.Status) {
217219

218220
func (t *callRole) updateState(s task.State) {
219221
if t.parent == nil {
220-
log.WithField("state", s.String()).Error("cannot update state with nil parent")
222+
log.WithField("state", s.String()).
223+
WithField("partition", t.GetEnvironmentId().String()).
224+
Error("cannot update state with nil parent")
221225
}
222-
log.WithField("role", t.Name).WithField("state", s.String()).Trace("updating state")
223226
t.state.merge(s, t)
227+
log.WithField("role", t.Name).
228+
WithField("partition", t.GetEnvironmentId().String()).
229+
Tracef("updated state to %s upon input state %s", t.state.get().String(), s.String())
224230
t.SendEvent(&event.RoleEvent{Name: t.Name, State: t.state.get().String(), RolePath: t.GetPath()})
225231
t.parent.updateState(s)
226232
}

core/workflow/taskrole.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,9 @@ func (t *taskRole) UpdateState(s task.State) {
212212

213213
func (t *taskRole) updateStatus(s task.Status) {
214214
if t.parent == nil {
215-
log.WithField("status", s.String()).Error("cannot update status with nil parent")
215+
log.WithField("status", s.String()).
216+
WithField("partition", t.GetEnvironmentId().String()).
217+
Error("cannot update status with nil parent")
216218
}
217219
t.status.merge(s, t)
218220
t.SendEvent(&event.RoleEvent{Name: t.Name, Status: t.status.get().String(), RolePath: t.GetPath()})
@@ -221,10 +223,14 @@ func (t *taskRole) updateStatus(s task.Status) {
221223

222224
func (t *taskRole) updateState(s task.State) {
223225
if t.parent == nil {
224-
log.WithField("state", s.String()).Error("cannot update state with nil parent")
226+
log.WithField("state", s.String()).
227+
WithField("partition", t.GetEnvironmentId().String()).
228+
Error("cannot update state with nil parent")
225229
}
226-
log.WithField("role", t.Name).WithField("state", s.String()).Trace("updating state")
227230
t.state.merge(s, t)
231+
log.WithField("role", t.Name).
232+
WithField("partition", t.GetEnvironmentId().String()).
233+
Tracef("updated state to %s upon input state %s", t.state.get().String(), s.String())
228234
t.SendEvent(&event.RoleEvent{Name: t.Name, State: t.state.get().String(), RolePath: t.GetPath()})
229235
t.parent.updateState(s)
230236
}

executor/executable/controllabletask.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,8 +467,10 @@ func (t *ControllableTask) Launch() error {
467467
if err != nil {
468468
log.WithField("partition", t.knownEnvironmentId.String()).
469469
WithField("detector", t.knownDetector).
470+
WithField("taskId", t.ti.TaskID.GetValue()).
471+
WithField("taskName", t.ti.Name).
470472
WithError(err).
471-
Warning("error marshaling message from task")
473+
Warning("error marshaling message")
472474
} else {
473475
t.sendMessage(jsonEvent)
474476
log.WithField("partition", t.knownEnvironmentId.String()).
@@ -492,6 +494,8 @@ func (t *ControllableTask) Launch() error {
492494
if t.rpc == nil {
493495
log.WithField("partition", t.knownEnvironmentId.String()).
494496
WithField("detector", t.knownDetector).
497+
WithField("taskId", deo.TaskId.GetValue()).
498+
WithField("taskName", t.ti.Name).
495499
WithError(err).
496500
Debug("event stream done")
497501
break
@@ -500,6 +504,8 @@ func (t *ControllableTask) Launch() error {
500504
if err == io.EOF {
501505
log.WithField("partition", t.knownEnvironmentId.String()).
502506
WithField("detector", t.knownDetector).
507+
WithField("taskId", deo.TaskId.GetValue()).
508+
WithField("taskName", t.ti.Name).
503509
WithError(err).
504510
Debug("event stream EOF")
505511
break
@@ -510,7 +516,9 @@ func (t *ControllableTask) Launch() error {
510516
WithField("detector", t.knownDetector).
511517
WithField("errorType", reflect.TypeOf(err)).
512518
WithField("level", infologger.IL_Devel).
513-
Warningf("error receiving event from task %s", deo.TaskId.String())
519+
WithField("taskId", deo.TaskId.GetValue()).
520+
WithField("taskName", t.ti.Name).
521+
Warning("error receiving event")
514522
if status.Code(err) == codes.Unavailable {
515523
break
516524
}
@@ -522,6 +530,8 @@ func (t *ControllableTask) Launch() error {
522530
if deviceEvent == nil {
523531
log.WithField("partition", t.knownEnvironmentId.String()).
524532
WithField("detector", t.knownDetector).
533+
WithField("taskId", deo.TaskId.GetValue()).
534+
WithField("taskName", t.ti.Name).
525535
Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream")
526536
break
527537
} else {
@@ -591,10 +601,14 @@ func (t *ControllableTask) Launch() error {
591601
_ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much
592602
log.WithField("partition", t.knownEnvironmentId.String()).
593603
WithField("detector", t.knownDetector).
604+
WithField("taskId", t.ti.TaskID.GetValue()).
605+
WithField("taskName", t.ti.Name).
594606
Debug("rpc client closed")
595607
t.rpc = nil
596608
log.WithField("partition", t.knownEnvironmentId.String()).
597609
WithField("detector", t.knownDetector).
610+
WithField("taskId", t.ti.TaskID.GetValue()).
611+
WithField("taskName", t.ti.Name).
598612
Debug("rpc client removed")
599613
}
600614

0 commit comments

Comments
 (0)