Skip to content

Commit 7b87bfb

Browse files
author
Michal Tichák
committed
[core] deployment retry properly retries only failed tasks
1 parent 8b14cc3 commit 7b87bfb

File tree

3 files changed

+399
-392
lines changed

3 files changed

+399
-392
lines changed

core/task/manager.go

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828
"context"
2929
"errors"
3030
"fmt"
31-
"github.com/AliceO2Group/Control/common/utils/safeacks"
3231
"os"
3332
"strings"
3433
"sync"
3534
"time"
3635

36+
"github.com/AliceO2Group/Control/common/utils/safeacks"
37+
3738
"github.com/AliceO2Group/Control/apricot"
3839
"github.com/AliceO2Group/Control/common/event"
3940
"github.com/AliceO2Group/Control/common/gera"
@@ -494,7 +495,12 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
494495
undeployableNonCriticalDescriptors := make(Descriptors, 0)
495496
undeployableCriticalDescriptors := make(Descriptors, 0)
496497

497-
deployedTasks := make(DeploymentMap)
498+
// we are retrying deployment multiple times in case of failure and we don't want
499+
// to rerun tasks already running
500+
tasksToRunThisAttempt := make(Descriptors, len(tasksToRun))
501+
copy(tasksToRunThisAttempt, tasksToRun)
502+
503+
allDeployedTasks := make(DeploymentMap)
498504
if len(tasksToRun) > 0 {
499505
// Alright, so we have some descriptors whose requirements should be met with
500506
// new Tasks we're about to deploy here.
@@ -524,45 +530,49 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
524530
undeployableNonCriticalDescriptors = make(Descriptors, 0)
525531
undeployableCriticalDescriptors = make(Descriptors, 0)
526532

527-
deployedTasks = make(DeploymentMap)
528-
529533
outcomeCh := make(chan ResourceOffersOutcome)
530534
m.tasksToDeploy <- &ResourceOffersDeploymentRequest{
531-
tasksToDeploy: tasksToRun,
535+
tasksToDeploy: tasksToRunThisAttempt,
532536
envId: envId,
533537
outcomeCh: outcomeCh,
534538
} // buffered channel, does not block
535539

536540
log.WithField("partition", envId).
537-
Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRun))
541+
Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRunThisAttempt))
538542

539543
timeReviveOffers := time.Now()
540544
timeDeployMu := time.Now()
541545
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
542546
<-m.reviveOffersTrg // we only continue when it's done
543547
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
544-
log.WithField("tasksToRun", len(tasksToRun)).
548+
log.WithField("tasksToRunThisAttempt", len(tasksToRunThisAttempt)).
545549
WithField("partition", envId))
546550

547551
roOutcome := <-outcomeCh // blocks until a verdict from resourceOffers comes in
548552

549553
utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section",
550-
log.WithField("tasksToRun", len(tasksToRun)).
554+
log.WithField("tasksToRunThisAttempt", len(tasksToRunThisAttempt)).
551555
WithField("partition", envId))
552556

553-
deployedTasks = roOutcome.deployed
557+
deployedThisAttempt := roOutcome.deployed
554558
undeployedDescriptors = roOutcome.undeployed
555559
undeployableDescriptors = roOutcome.undeployable
556560

557-
logWithId.WithField("tasks", deployedTasks).
558-
Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks))
561+
logWithId.WithField("tasks", deployedThisAttempt).
562+
Debugf("resourceOffers is done, %d new tasks running", len(deployedThisAttempt))
559563

560-
if len(deployedTasks) != len(tasksToRun) {
564+
for deployedTask, deployedDescriptor := range deployedThisAttempt {
565+
allDeployedTasks[deployedTask] = deployedDescriptor
566+
// add deployed tasks to roster, so updates can be distributed properly
567+
m.roster.append(deployedTask)
568+
}
569+
570+
if len(deployedThisAttempt) != len(tasksToRunThisAttempt) {
561571
// ↑ Not all roles could be deployed. If some were critical,
562572
// we cannot proceed with running this environment. Either way,
563573
// we keep the roles running since they might be useful in the future.
564574
logWithId.WithField("level", infologger.IL_Devel).
565-
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))
575+
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRunThisAttempt), len(deployedThisAttempt))
566576

567577
for _, desc := range undeployedDescriptors {
568578
if desc.TaskRole.GetTaskTraits().Critical == true {
@@ -586,7 +596,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
586596
if deploymentSuccess {
587597
// ↑ means all the required critical processes are now running,
588598
// and we are ready to update the envId
589-
for taskPtr, descriptor := range deployedTasks {
599+
for taskPtr, descriptor := range deployedThisAttempt {
590600
taskPtr.SetParent(descriptor.TaskRole)
591601
// Ensure everything is filled out properly
592602
if !taskPtr.IsLocked() {
@@ -596,6 +606,9 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
596606
}
597607
break DEPLOYMENT_ATTEMPTS_LOOP
598608
}
609+
tasksToRunThisAttempt = make(Descriptors, 0, len(undeployableDescriptors)+len(undeployedDescriptors))
610+
tasksToRunThisAttempt = append(tasksToRunThisAttempt, undeployedDescriptors...)
611+
tasksToRunThisAttempt = append(tasksToRunThisAttempt, undeployableDescriptors...)
599612

600613
log.WithField("partition", envId).
601614
WithField("level", infologger.IL_Devel).
@@ -604,12 +617,14 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
604617
}
605618
}
606619

620+
log.Infof("Succeeded to deploy %d/%d tasks", len(allDeployedTasks), len(tasksToRun))
621+
607622
{
608623
logWithIdDev := logWithId.WithField("level", infologger.IL_Devel)
609624
logDescriptors("critical task deployment impossible: ", logWithIdDev.Errorf, undeployableCriticalDescriptors)
610625
logDescriptors("critical task deployment failure: ", logWithIdDev.Errorf, undeployedCriticalDescriptors)
611-
612626
logDescriptors("non-critical task deployment failure: ", logWithIdDev.Warningf, undeployedNonCriticalDescriptors)
627+
613628
logDescriptors("non-critical task deployment impossible: ", logWithIdDev.Warningf, undeployableNonCriticalDescriptors)
614629
}
615630

@@ -624,7 +639,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
624639

625640
if !deploymentSuccess {
626641
var deployedTaskIds []string
627-
for taskPtr := range deployedTasks {
642+
for taskPtr := range allDeployedTasks {
628643
taskPtr.SetParent(nil)
629644
deployedTaskIds = append(deployedTaskIds, taskPtr.taskId)
630645
}
@@ -638,12 +653,8 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
638653
}
639654
}
640655

641-
// Finally, we write to the roster. Point of no return!
642-
for taskPtr := range deployedTasks {
643-
m.roster.append(taskPtr)
644-
}
645656
if deploymentSuccess {
646-
for taskPtr := range deployedTasks {
657+
for taskPtr := range allDeployedTasks {
647658
taskPtr.GetParent().SetTask(taskPtr)
648659
}
649660
for taskPtr, descriptor := range tasksAlreadyRunning {

0 commit comments

Comments
 (0)