Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/controller/operator/factory/reconcile/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (

appWaitReadyTimeout = 5 * time.Second
vmWaitReadyInterval = 5 * time.Second
vmWaitLogInterval = 60 * time.Second
)

// Init sets package defaults
Expand Down Expand Up @@ -158,6 +159,7 @@ func waitForStatus[T client.Object, ST StatusWithMetadata[STC], STC any](
) error {
lastStatus := obj.GetStatusMetadata()
nsn := types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}
lastLogged := time.Now()
err := wait.PollUntilContextCancel(ctx, interval, false, func(ctx context.Context) (done bool, err error) {
if err = rclient.Get(ctx, nsn, obj); err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -167,6 +169,11 @@ func waitForStatus[T client.Object, ST StatusWithMetadata[STC], STC any](
return
}
lastStatus = obj.GetStatusMetadata()

if lastStatus != nil && time.Now().After(lastLogged.Add(vmWaitLogInterval)) {
logger.WithContext(ctx).V(1).Info(fmt.Sprintf("waiting for %T=%s to be ready, current status: %s", obj, nsn.String(), string(lastStatus.UpdateStatus)))
lastLogged = time.Now()
}
return lastStatus != nil && obj.GetGeneration() == lastStatus.ObservedGeneration && lastStatus.UpdateStatus == status, nil
})
if err != nil {
Expand Down
15 changes: 11 additions & 4 deletions internal/controller/operator/factory/vmdistributed/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte
vmCluster := zs.vmclusters[clusterIdx]
clusterURLHash := fmt.Sprintf("%016X", xxhash.Sum64([]byte(vmCluster.GetRemoteWriteURL())))

nsnCluster := types.NamespacedName{Name: vmCluster.Name, Namespace: vmCluster.Namespace}
logger.WithContext(ctx).Info("ensuring persistent queues are drained", "name", nsnCluster.String())

pollMetrics := func(pctx context.Context, nsn types.NamespacedName, addr string) error {
return wait.PollUntilContextCancel(pctx, interval, true, func(ctx context.Context) (done bool, err error) {
// Query each discovered ip. If any returns non-zero metric, continue polling.
Expand All @@ -315,11 +318,11 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte
continue
}
if v > 0 {
logger.WithContext(ctx).Info("persistent queue on VMAgent instance is not ready", "url", addr, "name", nsn.String(), "size", v)
logger.WithContext(ctx).V(1).Info("persistent queue on VMAgent instance is not ready", "url", addr, "name", nsn.String(), "size", v)
return false, nil
}
}
logger.WithContext(ctx).Info("all persistent queues on VMAgent for given cluster were drained", "url", addr, "name", nsn.String())
logger.WithContext(ctx).V(1).Info("all persistent queues on VMAgent for given cluster were drained", "url", addr, "name", nsn.String())
return true, nil
})
}
Expand All @@ -342,7 +345,7 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte
if m.has(addr) {
continue
}
logger.WithContext(ctx).Info("start polling metrics from VMAgent instance", "url", addr, "name", nsn.String())
logger.WithContext(ctx).V(1).Info("start polling metrics from VMAgent instance", "url", addr, "name", nsn.String())
pctx := m.add(addr)
wg.Go(func() {
if err := pollMetrics(pctx, nsn, addr); err != nil {
Expand All @@ -353,14 +356,18 @@ func (zs *zones) waitForEmptyPQ(ctx context.Context, rclient client.Client, inte
}
for _, addr := range m.ids() {
if _, ok := addrs[addr]; !ok {
logger.WithContext(ctx).Info("stop polling metrics from VMAgent instance", "url", addr, "name", nsn.String())
logger.WithContext(ctx).V(1).Info("stop polling metrics from VMAgent instance", "url", addr, "name", nsn.String())
m.delete(addr)
}
}
}, defaultEndpointsUpdateInterval)
})
}
wg.Wait()

if ctx.Err() == nil {
logger.WithContext(ctx).Info("all persistent queues were drained", "name", nsnCluster.String())
}
}

func newManager(ctx context.Context) *manager {
Expand Down
Loading