Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/container/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/containerd/typeurl/v2"
"github.com/patrickmn/go-cache"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/zosbase/pkg/stubs"
)

func (c *Module) handlerEventTaskExit(ctx context.Context, ns string, event *events.TaskExit) {
Expand Down Expand Up @@ -55,17 +54,18 @@ func (c *Module) handlerEventTaskExit(ctx context.Context, ns string, event *eve
<-time.After(restartDelay) // wait for 2 seconds
reason = c.start(ns, event.ContainerID)
} else {
reason = fmt.Errorf("deleting container due to so many crashes")
reason = fmt.Errorf("restarting container failed due to so many crashes")
}
log.Debug().Err(reason).Msg("failed to restart container")

if reason != nil {
log.Debug().Err(reason).Msg("deleting container due to restart error")
// if reason != nil {
// log.Debug().Err(reason).Msg("deleting container due to restart error")

stub := stubs.NewProvisionStub(c.client)
if err := stub.DecommissionCached(ctx, event.ContainerID, reason.Error()); err != nil {
log.Error().Err(err).Msg("failed to decommission reservation")
}
}
// stub := stubs.NewProvisionStub(c.client)
// if err := stub.DecommissionCached(ctx, event.ContainerID, reason.Error()); err != nil {
// log.Error().Err(err).Msg("failed to decommission reservation")
// }
// }
}

func (c *Module) handleEvent(ctx context.Context, ns string, event interface{}) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/gridtypes/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func (d *Deployment) IsActive() bool {
active := false
for i := range d.Workloads {
wl := &d.Workloads[i]
if !wl.Result.State.IsAny(StateDeleted, StateError) {
// not delete or error so is probably active
if !wl.Result.State.IsAny(StateDeleted) {
// not deleted, so is active (includes StateError which needs retry)
return true
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type Provision interface {
// GetWorkloadStatus: returns status, bool(true if workload exits otherwise it is false), error
GetWorkloadStatus(id string) (gridtypes.ResultState, bool, error)
CreateOrUpdate(twin uint32, deployment gridtypes.Deployment, update bool) error
// SetWorkloadError updates workload state to error without decommissioning it
SetWorkloadError(id string, errorMsg string) error
// SetWorkloadOk updates workload state to ok without decommissioning it
SetWorkloadOk(id string) error
Get(twin uint32, contractID uint64) (gridtypes.Deployment, error)
List(twin uint32) ([]gridtypes.Deployment, error)
Changes(twin uint32, contractID uint64) ([]gridtypes.Workload, error)
Expand Down
66 changes: 66 additions & 0 deletions pkg/provision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,72 @@ func (e *NativeEngine) DecommissionCached(id string, reason string) error {
return err
}

// SetWorkloadError updates workload state to error without decommissioning it
func (e *NativeEngine) SetWorkloadError(id string, errorMsg string) error {
log.Info().Str("workload-id", id).Str("error", errorMsg).Msg("setting workload state to error")

globalID := gridtypes.WorkloadID(id)
twin, dlID, name, err := globalID.Parts()
if err != nil {
log.Error().Err(err).Str("workload-id", id).Msg("failed to parse workload ID")
return err
}
log.Debug().Uint32("twin", twin).Uint64("deployment", dlID).Str("name", string(name)).Msg("parsed workload ID")
wl, err := e.storage.Current(twin, dlID, name)
if err != nil {
log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dlID).Str("name", string(name)).Msg("failed to get workload from storage")
return err
}

if wl.Result.State == gridtypes.StateDeleted ||
wl.Result.State == gridtypes.StateError {
// nothing to do!
return nil
}

// Update result to StateError without uninstalling
result := gridtypes.Result{
Created: gridtypes.Now(),
State: gridtypes.StateError,
Error: errorMsg,
}

return e.storage.Transaction(twin, dlID, wl.WithResults(result))
}

// SetWorkloadOk updates workload state to ok without decommissioning it
func (e *NativeEngine) SetWorkloadOk(id string) error {
log.Info().Str("workload-id", id).Msg("setting workload state to ok")

globalID := gridtypes.WorkloadID(id)
twin, dlID, name, err := globalID.Parts()
if err != nil {
log.Error().Err(err).Str("workload-id", id).Msg("failed to parse workload ID")
return err
}
log.Debug().Uint32("twin", twin).Uint64("deployment", dlID).Str("name", string(name)).Msg("parsed workload ID")
wl, err := e.storage.Current(twin, dlID, name)
if err != nil {
log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dlID).Str("name", string(name)).Msg("failed to get workload from storage")
return err
}

if wl.Result.State == gridtypes.StateDeleted ||
wl.Result.State == gridtypes.StateOk {
// nothing to do!
return nil
}

// Update result to StateOk without uninstalling
result := gridtypes.Result{
Created: gridtypes.Now(),
State: gridtypes.StateOk,
Error: "",
}

return e.storage.Transaction(twin, dlID, wl.WithResults(result))
}

func (n *NativeEngine) CreateOrUpdate(twin uint32, deployment gridtypes.Deployment, update bool) error {
if err := deployment.Valid(); err != nil {
return err
Expand Down
30 changes: 30 additions & 0 deletions pkg/stubs/provision_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,33 @@ func (s *ProvisionStub) ListTwins(ctx context.Context) (ret0 []uint32, ret1 erro
}
return
}

func (s *ProvisionStub) SetWorkloadError(ctx context.Context, arg0 string, arg1 string) (ret0 error) {
args := []interface{}{arg0, arg1}
result, err := s.client.RequestContext(ctx, s.module, s.object, "SetWorkloadError", args...)
if err != nil {
panic(err)
}
result.PanicOnError()
ret0 = result.CallError()
loader := zbus.Loader{}
if err := result.Unmarshal(&loader); err != nil {
panic(err)
}
return
}

func (s *ProvisionStub) SetWorkloadOk(ctx context.Context, arg0 string) (ret0 error) {
args := []interface{}{arg0}
result, err := s.client.RequestContext(ctx, s.module, s.object, "SetWorkloadOk", args...)
if err != nil {
panic(err)
}
result.PanicOnError()
ret0 = result.CallError()
loader := zbus.Loader{}
if err := result.Unmarshal(&loader); err != nil {
panic(err)
}
return
}
146 changes: 125 additions & 21 deletions pkg/vm/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
monitorEvery = 10 * time.Second
logrotateEvery = 10 * time.Minute
cleanupEvery = 10 * time.Minute
// cooldownPeriod is the time to wait after a burst of failures before retrying
cooldownPeriod = 6 * time.Hour
)

var (
Expand All @@ -36,6 +38,13 @@ var (
)
)

// vmFailureState tracks the failure count and cooldown state for a VM
type vmFailureState struct {
Count int
CooldownUntil time.Time
LastCooldownLog time.Time
}

func (m *Module) logrotate(ctx context.Context) error {
log.Debug().Msg("running log rotations for vms")
running, err := FindAll()
Expand Down Expand Up @@ -148,6 +157,14 @@ func (m *Module) cleanupCidata() error {
for _, file := range files {
name := file.Name()
if _, ok := running[name]; !ok {
// Check if VM is being monitored/retried before deleting
marker, exists := m.failures.Get(name)
if exists && marker != permanent {
// VM is in failure tracking (cooldown/retry), don't delete its cloud-init
log.Debug().Str("vm-id", name).Msg("skipping cloud-init cleanup for VM in retry state")
continue
}
log.Debug().Str("vm-id", name).Msg("removing cloud-init for non-running VM")
_ = os.Remove(filepath.Join(dir, name))
continue
}
Expand All @@ -164,11 +181,11 @@ func (m *Module) monitorID(ctx context.Context, running map[string]Process, id s
return nil
}
if ps, ok := running[id]; ok {
state, exists, err := stub.GetWorkloadStatus(ctx, id)
workloadState, exists, err := stub.GetWorkloadStatus(ctx, id)
if err != nil {
return errors.Wrapf(err, "failed to get workload status for vm:%s ", id)
}
if !exists || state.IsAny(gridtypes.StateDeleted, gridtypes.StateError) {
if !exists || workloadState.IsAny(gridtypes.StateDeleted) {
log.Debug().Str("name", id).Msg("deleting running vm with no active workload")
m.removeConfig(id)
_ = syscall.Kill(ps.Pid, syscall.SIGKILL)
Expand All @@ -182,26 +199,86 @@ func (m *Module) monitorID(ctx context.Context, running map[string]Process, id s
marker, ok := m.failures.Get(id)
if !ok {
// no previous value. so this is the first failure
m.failures.Set(id, int(0), cache.DefaultExpiration)
m.failures.Set(id, &vmFailureState{Count: 0, CooldownUntil: time.Time{}, LastCooldownLog: time.Time{}}, cache.DefaultExpiration)
marker, _ = m.failures.Get(id)
}

if marker == permanent {
// if the marker is permanent. it means that this vm
// is being deleted or not monitored. we don't need to take any more action here
// is being deleted or not monitored. we don't need to take any action here
// (don't try to restart or delete)
m.removeConfig(id)
return nil
}

count, err := m.failures.IncrementInt(id, 1)
if err != nil {
// this should never happen because we make sure value
// is set
return errors.Wrap(err, "failed to check number of failure for the vm")
// Check if marker is a vmFailureState or old int format
var state *vmFailureState
switch v := marker.(type) {
case *vmFailureState:
state = v
case int:
// Migrate old int format to new struct format
state = &vmFailureState{Count: v, CooldownUntil: time.Time{}, LastCooldownLog: time.Time{}}
m.failures.Set(id, state, cache.DefaultExpiration)
default:
// Unknown format, reset
state = &vmFailureState{Count: 0, CooldownUntil: time.Time{}, LastCooldownLog: time.Time{}}
m.failures.Set(id, state, cache.DefaultExpiration)
}

// Check if we're in cooldown period
now := time.Now()
if !state.CooldownUntil.IsZero() && now.Before(state.CooldownUntil) {
// Only log cooldown message every 30 minutes
if state.LastCooldownLog.IsZero() || now.Sub(state.LastCooldownLog) >= 30*time.Minute {
log.Debug().Str("name", id).
Time("cooldown_until", state.CooldownUntil).
Dur("remaining", state.CooldownUntil.Sub(now)).
Msg("vm in cooldown period, skipping restart attempt")
state.LastCooldownLog = now
m.failures.Set(id, state, cache.NoExpiration)
}
return nil
}

// Set error state before attempting restart (if not in cooldown)
// This ensures the database reflects the actual state
log.Info().Str("name", id).Msg("vm detected as down, setting state to error")
log.Debug().Str("workload-id", id).Msg("attempting to set vm state to error")
if err := stub.SetWorkloadError(ctx, id, "vm detected as down"); err != nil {
// Check if deployment actually doesn't exist
if strings.Contains(err.Error(), "deployment does not exist") {
// Verify by checking workload status
_, exists, checkErr := stub.GetWorkloadStatus(ctx, id)
if checkErr == nil && !exists {
log.Warn().Str("workload-id", id).Msg("vm deployment confirmed deleted, stopping monitoring")
// Set permanent marker to prevent further restart attempts
m.failures.Set(id, permanent, cache.NoExpiration)
return nil
}
// If we can't confirm or it exists, treat as transient error and continue
log.Warn().Err(err).Str("workload-id", id).Msg("failed to set vm error state, but continuing retry")
} else {
log.Error().Err(err).Str("workload-id", id).Msg("failed to set vm error state")
}
}

// If we just exited cooldown, reset the failure count for a new burst
if !state.CooldownUntil.IsZero() && now.After(state.CooldownUntil) {
log.Info().Str("name", id).Msg("cooldown period expired, resetting failure count for new burst")
state.Count = 0
state.CooldownUntil = time.Time{}
state.LastCooldownLog = time.Time{} // Reset cooldown log timer
// Go back to using DefaultExpiration after cooldown expires
m.failures.Set(id, state, cache.DefaultExpiration)
}

// Increment failure count
state.Count++
m.failures.Set(id, state, cache.DefaultExpiration)

var reason error
if count < failuresBeforeDestroy {
if state.Count < failuresBeforeDestroy {
vm, err := MachineFromFile(m.configPath(id))

if err != nil {
Expand All @@ -216,21 +293,48 @@ func (m *Module) monitorID(ctx context.Context, running map[string]Process, id s
return nil
}

log.Debug().Str("name", id).Msg("trying to restart the vm")
log.Debug().Str("name", id).Int("attempt", state.Count).Msg("trying to restart the vm")
if _, err = vm.Run(ctx, m.socketPath(id), m.logsPath(id)); err != nil {
reason = m.withLogs(m.logsPath(id), err)
log.Warn().Err(reason).Str("name", id).Int("failures", state.Count).Msg("vm restart failed")
} else {
// Success! Reset failure count and set state to OK
log.Info().Str("name", id).Msg("vm restarted successfully")
state.Count = 0
state.CooldownUntil = time.Time{}
m.failures.Set(id, state, cache.DefaultExpiration)

// Update workload state to OK since VM is running
log.Debug().Str("workload-id", id).Msg("attempting to set vm state to ok")
if err := stub.SetWorkloadOk(ctx, id); err != nil {
// Check if deployment actually doesn't exist
if strings.Contains(err.Error(), "deployment does not exist") {
// Verify by checking workload status
_, exists, checkErr := stub.GetWorkloadStatus(ctx, id)
if checkErr == nil && !exists {
log.Warn().Str("workload-id", id).Msg("vm deployment confirmed deleted, will be stopped on next monitor cycle")
// Set permanent marker - next monitor cycle will kill VM and clean up config
m.failures.Set(id, permanent, cache.NoExpiration)
return nil
}
// If we can't confirm or it exists, treat as transient error and continue
log.Warn().Err(err).Str("workload-id", id).Msg("failed to set vm state to ok, but VM is running")
} else {
log.Error().Err(err).Str("workload-id", id).Msg("failed to set vm state to ok")
}
}
}
} else {
reason = fmt.Errorf("deleting vm due to so many crashes")
}

if reason != nil {
log.Debug().Err(reason).Msg("deleting vm due to restart error")
m.removeConfig(id)

if err := stub.DecommissionCached(ctx, id, reason.Error()); err != nil {
return errors.Wrapf(err, "failed to decommission reservation '%s'", id)
}
// Reached failure limit, enter cooldown period instead of decommissioning
state.CooldownUntil = now.Add(cooldownPeriod)
// Use NoExpiration to ensure cooldown state persists beyond cache default expiration
m.failures.Set(id, state, cache.NoExpiration)
log.Warn().Str("name", id).
Int("failures", state.Count).
Time("cooldown_until", state.CooldownUntil).
Dur("cooldown_duration", cooldownPeriod).
Msg("vm reached failure limit, entering cooldown period before next retry burst")
// Do NOT call DecommissionCached or removeConfig - keep the VM for retry
}

return nil
Expand Down
Loading