-
Notifications
You must be signed in to change notification settings - Fork 115
OCPBUGS-20056: EndpointAccessibleController: Retry the whole list endpoints and send request loop 3 times #855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ import ( | |
| apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
| utilerrors "k8s.io/apimachinery/pkg/util/errors" | ||
| "k8s.io/apimachinery/pkg/util/wait" | ||
| "k8s.io/klog/v2" | ||
|
|
||
| operatorv1 "github.com/openshift/api/operator/v1" | ||
| applyoperatorv1 "github.com/openshift/client-go/operator/applyconfigurations/operator/v1" | ||
|
|
@@ -20,13 +21,34 @@ import ( | |
| "github.com/openshift/library-go/pkg/operator/v1helpers" | ||
| ) | ||
|
|
||
| // The following constants are put together so that | ||
| // all attempts fit safely into resyncInterval. | ||
| const ( | ||
| resyncInterval = 1 * time.Minute | ||
|
|
||
| defaultRequestTimeout = 10 * time.Second | ||
| defaultRetryInterval = 5 * time.Second | ||
| defaultAttemptCount = 3 | ||
| ) | ||
|
|
||
| type endpointAccessibleController struct { | ||
| controllerInstanceName string | ||
| operatorClient v1helpers.OperatorClient | ||
| endpointListFn EndpointListFunc | ||
| getTLSConfigFn EndpointTLSConfigFunc | ||
| availableConditionName string | ||
| endpointCheckDisabledFunc EndpointCheckDisabledFunc | ||
| // httpClient overrides the default TLS client when set; used in tests. | ||
| httpClient *http.Client | ||
| // requestTimeout is the per-request context timeout. Zero means no | ||
| // timeout; defaults to defaultRequestTimeout when unset. | ||
| requestTimeout time.Duration | ||
| // retryInterval is the sleep duration between retry attempts. Zero means | ||
| // no sleep; defaults to defaultRetryInterval when unset. | ||
| retryInterval time.Duration | ||
| // attemptCount is the maximum number of fetch+check cycles; defaults to | ||
| // defaultAttemptCount when unset. | ||
| attemptCount int | ||
| } | ||
|
|
||
| type EndpointListFunc func() ([]string, error) | ||
|
|
@@ -59,7 +81,7 @@ func NewEndpointAccessibleController( | |
| WithInformers(triggers...). | ||
| WithInformers(operatorClient.Informer()). | ||
| WithSync(c.sync). | ||
| ResyncEvery(wait.Jitter(time.Minute, 1.0)). | ||
| ResyncEvery(wait.Jitter(resyncInterval, 1.0)). | ||
| WithSyncDegradedOnError(operatorClient). | ||
| ToController( | ||
| controllerName, // Don't change what is passed here unless you also remove the old FooDegraded condition | ||
|
|
@@ -88,59 +110,104 @@ func (c *endpointAccessibleController) sync(ctx context.Context, syncCtx factory | |
| } | ||
| } | ||
|
|
||
| endpoints, err := c.endpointListFn() | ||
| if err != nil { | ||
| if apierrors.IsNotFound(err) { | ||
| status := applyoperatorv1.OperatorStatus(). | ||
| WithConditions(applyoperatorv1.OperatorCondition(). | ||
| WithType(c.availableConditionName). | ||
| WithStatus(operatorv1.ConditionFalse). | ||
| WithReason("ResourceNotFound"). | ||
| WithMessage(err.Error())) | ||
| return c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, status) | ||
| client := c.httpClient | ||
| if client == nil { | ||
| var err error | ||
| client, err = c.buildTLSClient() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return err | ||
| } | ||
tchap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| client, err := c.buildTLSClient() | ||
| if err != nil { | ||
| return err | ||
| // Retry the full fetch+check cycle so that stale pod IPs from a rolling | ||
| // upgrade are replaced with fresh ones as soon as the Endpoints object is | ||
| // updated between attempts. | ||
| var ( | ||
| endpoints []string | ||
| errors []error | ||
| ) | ||
| attempts := c.attemptCount | ||
| if attempts <= 0 { | ||
| attempts = defaultAttemptCount | ||
| } | ||
| requestTimeout := c.requestTimeout | ||
| if requestTimeout <= 0 { | ||
| requestTimeout = defaultRequestTimeout | ||
| } | ||
| retryInterval := c.retryInterval | ||
| if retryInterval <= 0 { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment on the field says "Zero means no sleep" but the code treats <= 0 as "use default 5s". This is contradictory -- what is the intention here? Same issue with |
||
| retryInterval = defaultRetryInterval | ||
| } | ||
| // check all the endpoints in parallel. This matters for pods. | ||
| errCh := make(chan error, len(endpoints)) | ||
| wg := sync.WaitGroup{} | ||
| for _, endpoint := range endpoints { | ||
| wg.Add(1) | ||
| go func(endpoint string) { | ||
| defer wg.Done() | ||
|
|
||
| reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) // avoid waiting forever | ||
| defer cancel() | ||
| req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, endpoint, nil) | ||
| if err != nil { | ||
| errCh <- humanizeError(err) | ||
| return | ||
| for i := range attempts { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make more sense to take advantage of the |
||
| // Sleep before the next attempt to give the Endpoints object time to | ||
| // be updated (e.g. during a rolling upgrade). | ||
| if i > 0 { | ||
| select { | ||
| case <-time.After(retryInterval): | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| } | ||
tchap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| resp, err := client.Do(req) | ||
| if err != nil { | ||
| errCh <- humanizeError(err) | ||
| return | ||
| var err error | ||
| endpoints, err = c.endpointListFn() | ||
| if err != nil { | ||
| if apierrors.IsNotFound(err) { | ||
| status := applyoperatorv1.OperatorStatus(). | ||
| WithConditions(applyoperatorv1.OperatorCondition(). | ||
| WithType(c.availableConditionName). | ||
| WithStatus(operatorv1.ConditionFalse). | ||
| WithReason("ResourceNotFound"). | ||
| WithMessage(err.Error())) | ||
| return c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, status) | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| if resp.StatusCode > 299 || resp.StatusCode < 200 { | ||
| errCh <- fmt.Errorf("%q returned %q", endpoint, resp.Status) | ||
| if i == attempts-1 { | ||
| return err | ||
| } | ||
| }(endpoint) | ||
| } | ||
| wg.Wait() | ||
| close(errCh) | ||
| klog.FromContext(ctx).Error(err, "Failed to list endpoints, retrying...", "attempt", i) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it worth retrying in case the |
||
| continue | ||
| } | ||
|
|
||
| // Check all the endpoints in parallel. This matters for pods. | ||
| errCh := make(chan error, len(endpoints)) | ||
| wg := sync.WaitGroup{} | ||
| for _, endpoint := range endpoints { | ||
| wg.Add(1) | ||
| go func(endpoint string) { | ||
| defer wg.Done() | ||
|
|
||
| reqCtx, cancel := context.WithTimeout(ctx, requestTimeout) | ||
| defer cancel() | ||
|
|
||
| req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, endpoint, nil) | ||
| if err != nil { | ||
| errCh <- humanizeError(err) | ||
| return | ||
| } | ||
|
|
||
| resp, err := client.Do(req) | ||
| if err != nil { | ||
| errCh <- humanizeError(err) | ||
| return | ||
| } | ||
| defer resp.Body.Close() //nolint:errcheck | ||
|
|
||
| var errors []error | ||
| for err := range errCh { | ||
| errors = append(errors, err) | ||
| if resp.StatusCode > 299 || resp.StatusCode < 200 { | ||
| errCh <- fmt.Errorf("%q returned %q", endpoint, resp.Status) | ||
| } | ||
| }(endpoint) | ||
| } | ||
| wg.Wait() | ||
| close(errCh) | ||
|
|
||
| errors = nil | ||
| for err := range errCh { | ||
| errors = append(errors, err) | ||
| } | ||
|
|
||
| if len(endpoints) > 0 && len(errors) < len(endpoints) { | ||
| break // at least one endpoint responded; no need to retry | ||
| } | ||
| } | ||
|
|
||
| // if at least one endpoint responded, we are available | ||
|
|
@@ -189,7 +256,6 @@ func (c *endpointAccessibleController) buildTLSClient() (*http.Client, error) { | |
| transport.TLSClientConfig = tlsConfig | ||
| } | ||
| return &http.Client{ | ||
| Timeout: 5 * time.Second, | ||
| Transport: transport, | ||
| }, nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure about these values. It effectively means there is an inertial of 40 seconds before an actual
Available=falseissue surfaces. But there is always a trade-off.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a health check endpoint 10s request timeout feels a bit too long; the previous 5s is already generous enough in my view.
Regarding the retry interval, I don't have enough knowledge to reason with it -- the goal is to wait for the endpoints to be updated with fresh pod IPs after a rolling upgrade, and I do not know what constitutes enough time for this to happen. If you've tested this and are happy with it, that's good enough for me 👍