Skip to content

Commit f74cbc7

Browse files
author
Michal Tichák
committed
[core] deployment retry properly retries only failed tasks
1 parent 8b14cc3 commit f74cbc7

File tree

3 files changed

+402
-392
lines changed

3 files changed

+402
-392
lines changed

core/task/manager.go

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828
"context"
2929
"errors"
3030
"fmt"
31-
"github.com/AliceO2Group/Control/common/utils/safeacks"
3231
"os"
3332
"strings"
3433
"sync"
3534
"time"
3635

36+
"github.com/AliceO2Group/Control/common/utils/safeacks"
37+
3738
"github.com/AliceO2Group/Control/apricot"
3839
"github.com/AliceO2Group/Control/common/event"
3940
"github.com/AliceO2Group/Control/common/gera"
@@ -494,7 +495,12 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
494495
undeployableNonCriticalDescriptors := make(Descriptors, 0)
495496
undeployableCriticalDescriptors := make(Descriptors, 0)
496497

497-
deployedTasks := make(DeploymentMap)
498+
// we are retrying deployment multiple times in case of failure and we don't want
499+
// to rerun tasks already running
500+
tasksToRunOneRound := make(Descriptors, len(tasksToRun))
501+
copy(tasksToRunOneRound, tasksToRun)
502+
503+
allDeployedTasks := make(DeploymentMap)
498504
if len(tasksToRun) > 0 {
499505
// Alright, so we have some descriptors whose requirements should be met with
500506
// new Tasks we're about to deploy here.
@@ -524,45 +530,52 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
524530
undeployableNonCriticalDescriptors = make(Descriptors, 0)
525531
undeployableCriticalDescriptors = make(Descriptors, 0)
526532

527-
deployedTasks = make(DeploymentMap)
528-
529533
outcomeCh := make(chan ResourceOffersOutcome)
530534
m.tasksToDeploy <- &ResourceOffersDeploymentRequest{
531-
tasksToDeploy: tasksToRun,
535+
tasksToDeploy: tasksToRunOneRound,
532536
envId: envId,
533537
outcomeCh: outcomeCh,
534538
} // buffered channel, does not block
535539

536540
log.WithField("partition", envId).
537-
Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRun))
541+
Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRunOneRound))
538542

539543
timeReviveOffers := time.Now()
540544
timeDeployMu := time.Now()
541545
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
542546
<-m.reviveOffersTrg // we only continue when it's done
543547
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
544-
log.WithField("tasksToRun", len(tasksToRun)).
548+
log.WithField("tasksToRunOneRound", len(tasksToRunOneRound)).
545549
WithField("partition", envId))
546550

547551
roOutcome := <-outcomeCh // blocks until a verdict from resourceOffers comes in
548552

549553
utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section",
550-
log.WithField("tasksToRun", len(tasksToRun)).
554+
log.WithField("tasksToRunOneRound", len(tasksToRunOneRound)).
551555
WithField("partition", envId))
552556

553-
deployedTasks = roOutcome.deployed
557+
deployedThisRound := roOutcome.deployed
554558
undeployedDescriptors = roOutcome.undeployed
555559
undeployableDescriptors = roOutcome.undeployable
556560

557-
logWithId.WithField("tasks", deployedTasks).
558-
Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks))
561+
logWithId.WithField("tasks", deployedThisRound).
562+
Debugf("resourceOffers is done, %d new tasks running", len(deployedThisRound))
563+
564+
for deployedKey, deployedTask := range deployedThisRound {
565+
allDeployedTasks[deployedKey] = deployedTask
566+
}
559567

560-
if len(deployedTasks) != len(tasksToRun) {
568+
// add deployed tasks to roster, so updates can be distributed properly
569+
for taskPtr := range deployedThisRound {
570+
m.roster.append(taskPtr)
571+
}
572+
573+
if len(deployedThisRound) != len(tasksToRunOneRound) {
561574
// ↑ Not all roles could be deployed. If some were critical,
562575
// we cannot proceed with running this environment. Either way,
563576
// we keep the roles running since they might be useful in the future.
564577
logWithId.WithField("level", infologger.IL_Devel).
565-
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))
578+
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRunOneRound), len(deployedThisRound))
566579

567580
for _, desc := range undeployedDescriptors {
568581
if desc.TaskRole.GetTaskTraits().Critical == true {
@@ -586,7 +599,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
586599
if deploymentSuccess {
587600
// ↑ means all the required critical processes are now running,
588601
// and we are ready to update the envId
589-
for taskPtr, descriptor := range deployedTasks {
602+
for taskPtr, descriptor := range deployedThisRound {
590603
taskPtr.SetParent(descriptor.TaskRole)
591604
// Ensure everything is filled out properly
592605
if !taskPtr.IsLocked() {
@@ -596,6 +609,9 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
596609
}
597610
break DEPLOYMENT_ATTEMPTS_LOOP
598611
}
612+
tasksToRunOneRound = make(Descriptors, 0, len(undeployableDescriptors)+len(undeployedDescriptors))
613+
tasksToRunOneRound = append(tasksToRunOneRound, undeployedDescriptors...)
614+
tasksToRunOneRound = append(tasksToRunOneRound, undeployableDescriptors...)
599615

600616
log.WithField("partition", envId).
601617
WithField("level", infologger.IL_Devel).
@@ -604,12 +620,14 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
604620
}
605621
}
606622

623+
log.Infof("Succeeded to deploy %d/%d tasks", len(allDeployedTasks), len(tasksToRun))
624+
607625
{
608626
logWithIdDev := logWithId.WithField("level", infologger.IL_Devel)
609627
logDescriptors("critical task deployment impossible: ", logWithIdDev.Errorf, undeployableCriticalDescriptors)
610628
logDescriptors("critical task deployment failure: ", logWithIdDev.Errorf, undeployedCriticalDescriptors)
611-
612629
logDescriptors("non-critical task deployment failure: ", logWithIdDev.Warningf, undeployedNonCriticalDescriptors)
630+
613631
logDescriptors("non-critical task deployment impossible: ", logWithIdDev.Warningf, undeployableNonCriticalDescriptors)
614632
}
615633

@@ -624,7 +642,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
624642

625643
if !deploymentSuccess {
626644
var deployedTaskIds []string
627-
for taskPtr := range deployedTasks {
645+
for taskPtr := range allDeployedTasks {
628646
taskPtr.SetParent(nil)
629647
deployedTaskIds = append(deployedTaskIds, taskPtr.taskId)
630648
}
@@ -638,12 +656,8 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
638656
}
639657
}
640658

641-
// Finally, we write to the roster. Point of no return!
642-
for taskPtr := range deployedTasks {
643-
m.roster.append(taskPtr)
644-
}
645659
if deploymentSuccess {
646-
for taskPtr := range deployedTasks {
660+
for taskPtr := range allDeployedTasks {
647661
taskPtr.GetParent().SetTask(taskPtr)
648662
}
649663
for taskPtr, descriptor := range tasksAlreadyRunning {

0 commit comments

Comments
 (0)