Skip to content

Commit ff6c7ac

Browse files
author
Michal Tichák
committed
fixup! [core] deployment retry properly retries only failed tasks
1 parent f74cbc7 commit ff6c7ac

File tree

1 file changed

+15
-18
lines changed

1 file changed

+15
-18
lines changed

core/task/manager.go

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,8 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
497497

498498
// we are retrying deployment multiple times in case of failure and we don't want
499499
// to rerun tasks already running
500-
tasksToRunOneRound := make(Descriptors, len(tasksToRun))
501-
copy(tasksToRunOneRound, tasksToRun)
500+
tasksToRunThisAttempt := make(Descriptors, len(tasksToRun))
501+
copy(tasksToRunThisAttempt, tasksToRun)
502502

503503
allDeployedTasks := make(DeploymentMap)
504504
if len(tasksToRun) > 0 {
@@ -532,26 +532,26 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
532532

533533
outcomeCh := make(chan ResourceOffersOutcome)
534534
m.tasksToDeploy <- &ResourceOffersDeploymentRequest{
535-
tasksToDeploy: tasksToRunOneRound,
535+
tasksToDeploy: tasksToRunThisAttempt,
536536
envId: envId,
537537
outcomeCh: outcomeCh,
538538
} // buffered channel, does not block
539539

540540
log.WithField("partition", envId).
541-
Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRunOneRound))
541+
Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRunThisAttempt))
542542

543543
timeReviveOffers := time.Now()
544544
timeDeployMu := time.Now()
545545
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
546546
<-m.reviveOffersTrg // we only continue when it's done
547547
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
548-
log.WithField("tasksToRunOneRound", len(tasksToRunOneRound)).
548+
log.WithField("tasksToRunThisAttempt", len(tasksToRunThisAttempt)).
549549
WithField("partition", envId))
550550

551551
roOutcome := <-outcomeCh // blocks until a verdict from resourceOffers comes in
552552

553553
utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section",
554-
log.WithField("tasksToRunOneRound", len(tasksToRunOneRound)).
554+
log.WithField("tasksToRunThisAttempt", len(tasksToRunThisAttempt)).
555555
WithField("partition", envId))
556556

557557
deployedThisRound := roOutcome.deployed
@@ -561,21 +561,18 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
561561
logWithId.WithField("tasks", deployedThisRound).
562562
Debugf("resourceOffers is done, %d new tasks running", len(deployedThisRound))
563563

564-
for deployedKey, deployedTask := range deployedThisRound {
565-
allDeployedTasks[deployedKey] = deployedTask
564+
for deployedTask, deployedDescriptor := range deployedThisRound {
565+
allDeployedTasks[deployedTask] = deployedDescriptor
566+
// add deployed tasks to roster, so updates can be distributed properly
567+
m.roster.append(deployedTask)
566568
}
567569

568-
// add deployed tasks to roster, so updates can be distributed properly
569-
for taskPtr := range deployedThisRound {
570-
m.roster.append(taskPtr)
571-
}
572-
573-
if len(deployedThisRound) != len(tasksToRunOneRound) {
570+
if len(deployedThisRound) != len(tasksToRunThisAttempt) {
574571
// ↑ Not all roles could be deployed. If some were critical,
575572
// we cannot proceed with running this environment. Either way,
576573
// we keep the roles running since they might be useful in the future.
577574
logWithId.WithField("level", infologger.IL_Devel).
578-
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRunOneRound), len(deployedThisRound))
575+
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRunThisAttempt), len(deployedThisRound))
579576

580577
for _, desc := range undeployedDescriptors {
581578
if desc.TaskRole.GetTaskTraits().Critical == true {
@@ -609,9 +606,9 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
609606
}
610607
break DEPLOYMENT_ATTEMPTS_LOOP
611608
}
612-
tasksToRunOneRound = make(Descriptors, 0, len(undeployableDescriptors)+len(undeployedDescriptors))
613-
tasksToRunOneRound = append(tasksToRunOneRound, undeployedDescriptors...)
614-
tasksToRunOneRound = append(tasksToRunOneRound, undeployableDescriptors...)
609+
tasksToRunThisAttempt = make(Descriptors, 0, len(undeployableDescriptors)+len(undeployedDescriptors))
610+
tasksToRunThisAttempt = append(tasksToRunThisAttempt, undeployedDescriptors...)
611+
tasksToRunThisAttempt = append(tasksToRunThisAttempt, undeployableDescriptors...)
615612

616613
log.WithField("partition", envId).
617614
WithField("level", infologger.IL_Devel).

0 commit comments

Comments
 (0)