Skip to content

Commit acd7c34

Browse files
committed
[core] Support partial DCS PFR if some detectors not ready for PFR
1 parent 61259bb commit acd7c34

File tree

2 files changed

+64
-40
lines changed

2 files changed

+64
-40
lines changed

core/integration/dcs/plugin.go

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* === This file is part of ALICE O² ===
33
*
4-
* Copyright 2021-2024 CERN and copyright holders of ALICE O².
4+
* Copyright 2021-2025 CERN and copyright holders of ALICE O².
55
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
66
*
77
* This program is free software: you can redistribute it and/or modify
@@ -33,6 +33,7 @@ import (
3333
"fmt"
3434
"io"
3535
"net/url"
36+
"sort"
3637
"strconv"
3738
"strings"
3839
"sync"
@@ -438,8 +439,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
438439
}
439440

440441
// We acquire a grace period during which we hope that DCS will become compatible with the operation.
441-
// During this period we'll keep checking our internal state for op compatibility as reported by DCS at 1Hz,
442-
// and if we don't get a compatible state within the grace period, we declare the operation failed.
442+
// During this period we'll keep checking our internal state for op compatibility as reported by DCS
443+
// at 1Hz, and if we don't get a compatible state for all included detectors within the grace period,
444+
// we go ahead with a partial PFR.
443445
pfrGracePeriod := time.Duration(0)
444446
pfrGracePeriodS, ok := varStack["dcs_pfr_grace_period"]
445447
if ok {
@@ -476,8 +478,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
476478
pfrGraceTimeout := time.Now().Add(pfrGracePeriod)
477479
isCompatibleWithOperation := false
478480

481+
var dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr DCSDetectors
482+
479483
knownDetectorStates := p.getDetectorsPfrAvailability(dcsDetectors)
480-
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
484+
isCompatibleWithOperation, dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
481485

482486
for {
483487
if isCompatibleWithOperation {
@@ -494,34 +498,39 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
494498

495499
if time.Now().Before(pfrGraceTimeout) {
496500
knownDetectorStates = p.getDetectorsPfrAvailability(dcsDetectors)
497-
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
501+
isCompatibleWithOperation, dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
498502
} else {
499503
break
500504
}
501505
}
502506

503-
if !isCompatibleWithOperation {
504-
log.WithError(err).
505-
WithField("level", infologger.IL_Ops).
506-
WithField("partition", envId).
507-
WithField("call", "PrepareForRun").
508-
Error("DCS error")
507+
ecsDetectorsIncompatibleWithPfr := dcsDetectorsIncompatibleWithPfr.EcsDetectorsSlice()
508+
sort.Strings(ecsDetectorsIncompatibleWithPfr)
509509

510-
call.VarStack["__call_error_reason"] = err.Error()
511-
call.VarStack["__call_error"] = callFailedStr
510+
if !isCompatibleWithOperation { // some detectors are not ready for PFR
511+
if len(dcsDetectorsCompatibleWithPfr) == 0 { // if actually none are ready, we bail
512+
log.WithError(err).
513+
WithField("level", infologger.IL_Ops).
514+
WithField("partition", envId).
515+
WithField("call", "PrepareForRun").
516+
Error("DCS error")
512517

513-
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
514-
Name: call.GetName(),
515-
OperationName: call.Func,
516-
OperationStatus: pb.OpStatus_DONE_ERROR,
517-
OperationStep: "acquire detectors availability",
518-
OperationStepStatus: pb.OpStatus_DONE_ERROR,
519-
EnvironmentId: envId,
520-
Payload: string(payloadJson[:]),
521-
Error: err.Error(),
522-
})
518+
call.VarStack["__call_error_reason"] = err.Error()
519+
call.VarStack["__call_error"] = callFailedStr
523520

524-
return
521+
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
522+
Name: call.GetName(),
523+
OperationName: call.Func,
524+
OperationStatus: pb.OpStatus_DONE_ERROR,
525+
OperationStep: "acquire detectors availability",
526+
OperationStepStatus: pb.OpStatus_DONE_ERROR,
527+
EnvironmentId: envId,
528+
Payload: string(payloadJson[:]),
529+
Error: err.Error(),
530+
})
531+
532+
return
533+
}
525534
} else if isCompatibleWithOperation && err != nil {
526535
log.WithField("level", infologger.IL_Ops).
527536
WithField("partition", envId).
@@ -542,11 +551,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
542551
Payload: string(payloadJson[:]),
543552
})
544553

545-
// By now the DCS must be in a compatible state, so we proceed with gathering params for the operation
554+
// By now the DCS must be in a compatible state for at least some detectors, so we proceed
555+
// with gathering params for the operation
546556

557+
if len(ecsDetectorsIncompatibleWithPfr) > 0 {
558+
log.WithField("partition", envId).
559+
WithField("level", infologger.IL_Ops).
560+
Warnf("skipping DCS PFR for detectors: %s - a detector state compatible with PFR was not reached within %s", strings.Join(ecsDetectorsIncompatibleWithPfr, " "), pfrGracePeriod)
561+
}
547562
log.WithField("partition", envId).
548563
WithField("level", infologger.IL_Ops).
549-
Infof("performing DCS PFR for detectors: %s", strings.Join(dcsDetectors.EcsDetectorsSlice(), " "))
564+
Infof("performing DCS PFR for detectors: %s", strings.Join(dcsDetectorsCompatibleWithPfr.EcsDetectorsSlice(), " "))
550565

551566
parameters, ok := varStack["dcs_sor_parameters"]
552567
if !ok {
@@ -560,7 +575,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
560575
bytes := []byte(parameters)
561576
err = json.Unmarshal(bytes, &argMap)
562577
if err != nil {
563-
err = fmt.Errorf("error processing DCS SOR parameters: %w", err)
578+
err = fmt.Errorf("error processing DCS PFR parameters: %w", err)
564579
log.WithError(err).
565580
WithField("partition", envId).
566581
WithField("level", infologger.IL_Ops).
@@ -595,9 +610,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
595610
in := dcspb.PfrRequest{
596611
RunType: rt,
597612
PartitionId: envId,
598-
Detectors: make([]*dcspb.DetectorOperationRequest, len(dcsDetectors)),
613+
Detectors: make([]*dcspb.DetectorOperationRequest, len(dcsDetectorsCompatibleWithPfr)),
599614
}
600-
for i, dcsDet := range dcsDetectors {
615+
for i, dcsDet := range dcsDetectorsCompatibleWithPfr {
601616
ecsDet := dcsToEcsDetector(dcsDet)
602617
perDetectorParameters, okParam := varStack[strings.ToLower(ecsDet)+"_dcs_sor_parameters"]
603618
if !okParam {
@@ -693,7 +708,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
693708
defer cancel()
694709

695710
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
696-
for _, v := range dcsDetectors {
711+
for _, v := range dcsDetectorsCompatibleWithPfr {
697712
detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE
698713
}
699714

@@ -1044,7 +1059,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
10441059

10451060
dcsFailedEcsDetectors := make([]string, 0)
10461061
dcsopOk := true
1047-
for _, v := range dcsDetectors {
1062+
for _, v := range dcsDetectorsCompatibleWithPfr {
10481063
if detectorStatusMap[v] != dcspb.DetectorState_RUN_OK {
10491064
dcsopOk = false
10501065
dcsFailedEcsDetectors = append(dcsFailedEcsDetectors, dcsToEcsDetector(v))
@@ -1173,7 +1188,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11731188
isCompatibleWithOperation := false
11741189

11751190
knownDetectorStates := p.getDetectorsSorAvailability(dcsDetectors)
1176-
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
1191+
isCompatibleWithOperation, _, _, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
11771192

11781193
for {
11791194
if isCompatibleWithOperation {
@@ -1190,7 +1205,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11901205

11911206
if time.Now().Before(sorGraceTimeout) {
11921207
knownDetectorStates = p.getDetectorsSorAvailability(dcsDetectors)
1193-
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
1208+
isCompatibleWithOperation, _, _, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
11941209
} else {
11951210
break
11961211
}

core/integration/dcs/structs.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* === This file is part of ALICE O² ===
33
*
4-
* Copyright 2021-2024 CERN and copyright holders of ALICE O².
4+
* Copyright 2021-2025 CERN and copyright holders of ALICE O².
55
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
66
*
77
* This program is free software: you can redistribute it and/or modify
@@ -102,36 +102,45 @@ func (dsm DCSDetectorOpAvailabilityMap) makeDetectorsByStateMap() map[dcspb.Dete
102102
}
103103

104104
// Returns true if the provided detectors are either all in conditionState or in NULL_STATE
105-
func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionState dcspb.DetectorState) (bool, error) {
105+
func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionState dcspb.DetectorState) (isCompatible bool, detectorsCompatible DCSDetectors, detectorsIncompatible DCSDetectors, err error) {
106106
detectorsByState := dsm.makeDetectorsByStateMap()
107107

108108
if len(detectorsByState) == 0 {
109-
return true, fmt.Errorf("no detectors provided")
109+
return true, make(DCSDetectors, 0), make(DCSDetectors, 0), fmt.Errorf("no detectors provided")
110110
}
111111

112112
detectorsInConditionState, thereAreDetectorsInConditionState := detectorsByState[conditionState]
113113
detectorsInNullState, thereAreDetectorsInNullState := detectorsByState[dcspb.DetectorState_NULL_STATE]
114114

115115
if thereAreDetectorsInConditionState && (len(detectorsInConditionState) == len(dsm)) {
116116
// all detectors are in conditionState
117-
return true, nil
117+
return true, detectorsInConditionState, make(DCSDetectors, 0), nil
118118
} else if thereAreDetectorsInConditionState && thereAreDetectorsInNullState && (len(detectorsInConditionState)+len(detectorsInNullState) == len(dsm)) {
119119
// all detectors are either in conditionState or in NULL_STATE
120-
return true, fmt.Errorf("detectors %s are in NULL_STATE", strings.Join(detectorsByState[dcspb.DetectorState_NULL_STATE].ToStringSlice(), ", "))
120+
detectorsCompatible = append(detectorsInConditionState, detectorsInNullState...)
121+
return true, detectorsCompatible, make(DCSDetectors, 0), fmt.Errorf("detectors %s are in NULL_STATE", strings.Join(detectorsByState[dcspb.DetectorState_NULL_STATE].ToStringSlice(), ", "))
121122
} else if thereAreDetectorsInNullState && (len(detectorsInNullState) == len(dsm)) {
122123
// all detectors are in NULL_STATE
123-
return true, fmt.Errorf("all detectors are in NULL_STATE")
124+
return true, detectorsInNullState, make(DCSDetectors, 0), fmt.Errorf("all detectors are in NULL_STATE")
124125
} else {
125126
// there are detectors in other states incompatible with conditionState
126127
reportByState := make([]string, 0)
128+
detectorsCompatible = make(DCSDetectors, 0)
129+
detectorsIncompatible = make(DCSDetectors, 0)
127130
for state, detectors := range detectorsByState {
128131
if state == conditionState {
132+
detectorsCompatible = append(detectorsCompatible, detectors...)
129133
continue
130134
}
135+
if state == dcspb.DetectorState_NULL_STATE {
136+
detectorsCompatible = append(detectorsCompatible, detectors...)
137+
} else {
138+
detectorsIncompatible = append(detectorsIncompatible, detectors...)
139+
}
131140
reportByState = append(reportByState,
132141
fmt.Sprintf("%s in %s", strings.Join(detectors.ToStringSlice(), ", "), state.String()))
133142
}
134-
return false, fmt.Errorf("detectors are in incompatible states: %v", strings.Join(reportByState, "; "))
143+
return false, detectorsCompatible, detectorsIncompatible, fmt.Errorf("detectors are in incompatible states: %v", strings.Join(reportByState, "; "))
135144
}
136145
}
137146

0 commit comments

Comments
 (0)