Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 56 additions & 13 deletions pkg/framework/machinesets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/google/uuid"
. "github.com/onsi/gomega"

configv1 "github.com/openshift/api/config/v1"
machinev1 "github.com/openshift/api/machine/v1beta1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -53,6 +55,10 @@ const (
)

var (
machineSetScaleResource = schema.GroupResource{Group: machineAPIGroup, Resource: "machinesets"}
scaleClientMu sync.Mutex
cachedScaleClient scale.ScalesGetter

// ErrMachineNotProvisionedInsufficientCloudCapacity is used when we detect that the machine is not being provisioned due to insufficient provider capacity.
ErrMachineNotProvisionedInsufficientCloudCapacity = errors.New("machine creation failed due to insufficient cloud provider capacity")

Expand Down Expand Up @@ -477,55 +483,92 @@ func NewMachineSet(
return &ms
}

// GetMachineSetScale returns the Scale subresource for the named MachineSet.
func GetMachineSetScale(name string) (*autoscalingv1.Scale, error) {
scaleClient, err := getScaleClient()
if err != nil {
return nil, fmt.Errorf("getting scale client: %w", err)
}

scaleObj, err := getMachineSetScale(scaleClient, name)
if err != nil {
return nil, err
}

return scaleObj, nil
}

func getMachineSetScale(scaleClient scale.ScalesGetter, name string) (*autoscalingv1.Scale, error) {
scaleObj, err := scaleClient.Scales(MachineAPINamespace).Get(context.Background(), machineSetScaleResource, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("getting MachineSet scale: %w", err)
}

return scaleObj, nil
}

// ScaleMachineSet scales a machineSet with a given name to the given number of replicas.
func ScaleMachineSet(name string, replicas int) error {
scaleClient, err := getScaleClient()
if err != nil {
return fmt.Errorf("error calling getScaleClient %w", err)
return fmt.Errorf("getting scale client: %w", err)
}

scale, err := scaleClient.Scales(MachineAPINamespace).Get(context.Background(), schema.GroupResource{Group: machineAPIGroup, Resource: "MachineSet"}, name, metav1.GetOptions{})
scaleObj, err := getMachineSetScale(scaleClient, name)
if err != nil {
return fmt.Errorf("error calling scaleClient.Scales get: %w", err)
return err
}

scaleUpdate := scale.DeepCopy()
scaleUpdate := scaleObj.DeepCopy()
scaleUpdate.Spec.Replicas = int32(replicas)

_, err = scaleClient.Scales(MachineAPINamespace).Update(context.Background(), schema.GroupResource{Group: machineAPIGroup, Resource: "MachineSet"}, scaleUpdate, metav1.UpdateOptions{})
_, err = scaleClient.Scales(MachineAPINamespace).Update(context.Background(), machineSetScaleResource, scaleUpdate, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error calling scaleClient.Scales update: %w", err)
return fmt.Errorf("updating MachineSet scale: %w", err)
}

return nil
}

// getScaleClient returns a ScalesGetter object to manipulate scale subresources.
func getScaleClient() (scale.ScalesGetter, error) {
scaleClientMu.Lock()
defer scaleClientMu.Unlock()

if cachedScaleClient != nil {
return cachedScaleClient, nil
}

cfg, err := config.GetConfig()
if err != nil {
return nil, fmt.Errorf("error getting config %w", err)
return nil, fmt.Errorf("getting config: %w", err)
}

httpClient, err := rest.HTTPClientFor(cfg)
if err != nil {
return nil, fmt.Errorf("error calling rest.HTTPClientFor %w", err)
return nil, fmt.Errorf("building HTTP client: %w", err)
}

mapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
if err != nil {
return nil, fmt.Errorf("error calling NewDiscoveryRESTMapper %w", err)
return nil, fmt.Errorf("building dynamic REST mapper: %w", err)
}

discovery := discovery.NewDiscoveryClientForConfigOrDie(cfg)
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discovery)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("building discovery client: %w", err)
}

scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discoveryClient)

scaleClient, err := scale.NewForConfig(cfg, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, fmt.Errorf("error calling building scale client %w", err)
return nil, fmt.Errorf("building scale client: %w", err)
}

return scaleClient, nil
cachedScaleClient = scaleClient

return cachedScaleClient, nil
}

// WaitForMachineSet waits for the all Machines belonging to the named
Expand Down
163 changes: 154 additions & 9 deletions pkg/infra/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package infra
import (
"context"
"fmt"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

configv1 "github.com/openshift/api/config/v1"
machinev1 "github.com/openshift/api/machine/v1beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -30,6 +33,33 @@ var nodeDrainLabels = map[string]string{
"node-draining-test": string(uuid.NewUUID()),
}

func findInvalidSelectorCondition(conditions []autoscalingv2.HorizontalPodAutoscalerCondition) *autoscalingv2.HorizontalPodAutoscalerCondition {
for i := range conditions {
condition := &conditions[i]
reason := strings.ToLower(condition.Reason)
message := strings.ToLower(condition.Message)

if reason == "invalidselector" ||
strings.Contains(message, "missing a selector") ||
strings.Contains(message, "selector is required") {
return condition
}
}

return nil
}

func findHPACondition(conditions []autoscalingv2.HorizontalPodAutoscalerCondition, conditionType autoscalingv2.HorizontalPodAutoscalerConditionType) *autoscalingv2.HorizontalPodAutoscalerCondition {
for i := range conditions {
condition := &conditions[i]
if condition.Type == conditionType {
return condition
}
}

return nil
}

func replicationControllerWorkload(namespace string) *corev1.ReplicationController {
var replicas int32 = 20

Expand Down Expand Up @@ -190,24 +220,139 @@ var _ = Describe("[sig-cluster-lifecycle] Machine API Managed cluster should", f
}
})

When("machineset has one replica", framework.LabelDisruptive, func() {
BeforeEach(func() {
var err error
When("machineset has one replica", framework.LabelDisruptive, Ordered, func() {
var sharedMachineSet *machinev1.MachineSet

machineSetParams = framework.BuildMachineSetParams(ctx, client, 1)
BeforeAll(func() {
setupCtx := framework.GetContext()
setupClient, err := framework.LoadClient()
Expect(err).ToNot(HaveOccurred(), "Controller-runtime client should be able to be created for shared MachineSet setup")

By("Creating a new MachineSet")
params := framework.BuildMachineSetParams(setupCtx, setupClient, 1)

machineSet, err = framework.CreateMachineSet(client, machineSetParams)
By("Creating a shared 1-replica MachineSet")

sharedMachineSet, err = framework.CreateMachineSet(setupClient, params)
Expect(err).ToNot(HaveOccurred(), "MachineSet should be able to be created")

framework.WaitForMachineSet(ctx, client, machineSet.GetName())
framework.WaitForMachineSet(setupCtx, setupClient, sharedMachineSet.GetName())
})

AfterAll(func() {
if sharedMachineSet == nil {
return
}

cleanupCtx := framework.GetContext()
cleanupClient, err := framework.LoadClient()
Expect(err).ToNot(HaveOccurred(), "Controller-runtime client should be able to be created for shared MachineSet cleanup")

By("Deleting the shared 1-replica MachineSet")
Expect(cleanupClient.Delete(cleanupCtx, sharedMachineSet)).To(Succeed(), "MachineSet should be able to be deleted")
framework.WaitForMachineSetsDeleted(cleanupCtx, cleanupClient, sharedMachineSet)
})

// Machines required for test: 1
// Reason: This test works on a single machine and its node.
// Reason: Uses the shared 1-replica MachineSet created in BeforeAll to verify that
// the scale subresource publishes a selector for HPA consumers.
It("publish selector in the scale subresource for HPA targets", func() {
expectedSelector := metav1.FormatLabelSelector(&sharedMachineSet.Spec.Selector)
cpuUtilizationTarget := int32(60)

minReplicas := int32(1)
if sharedMachineSet.Spec.Replicas != nil {
minReplicas = *sharedMachineSet.Spec.Replicas
}

maxReplicas := minReplicas + 1

By("Checking the MachineSet scale subresource selector")
Eventually(func() error {
scale, err := framework.GetMachineSetScale(sharedMachineSet.GetName())
if err != nil {
return err
}

if scale.Status.Selector == "" {
return fmt.Errorf("MachineSet scale status.selector is empty")
}

if scale.Status.Selector != expectedSelector {
return fmt.Errorf("expected scale status.selector %q, got %q", expectedSelector, scale.Status.Selector)
}

return nil
}, framework.WaitMedium, framework.RetryMedium).Should(Succeed(), "MachineSet scale selector should match the MachineSet selector")

hpa := &autoscalingv2.HorizontalPodAutoscaler{
TypeMeta: metav1.TypeMeta{
Kind: "HorizontalPodAutoscaler",
APIVersion: "autoscaling/v2",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-hpa-", sharedMachineSet.GetName()),
Namespace: framework.MachineAPINamespace,
},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
APIVersion: "machine.openshift.io/v1beta1",
Kind: "MachineSet",
Name: sharedMachineSet.GetName(),
},
MinReplicas: &minReplicas,
MaxReplicas: maxReplicas,
Metrics: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricSource{
Name: corev1.ResourceCPU,
Target: autoscalingv2.MetricTarget{
Type: autoscalingv2.UtilizationMetricType,
AverageUtilization: &cpuUtilizationTarget,
},
},
},
},
},
}

By("Creating an HPA targeting the MachineSet")
Expect(client.Create(ctx, hpa)).To(Succeed(), "Should be able to create an HPA targeting the MachineSet")
DeferCleanup(func() {
if err := client.Delete(ctx, hpa); err != nil && !apierrors.IsNotFound(err) {
Expect(err).ToNot(HaveOccurred(), "HPA should be able to be deleted")
}
})

By("Waiting for the HPA controller to reconcile the MachineSet target")
Eventually(func() error {
current := &autoscalingv2.HorizontalPodAutoscaler{}
if err := client.Get(ctx, runtimeclient.ObjectKeyFromObject(hpa), current); err != nil {
return err
}

if invalidSelector := findInvalidSelectorCondition(current.Status.Conditions); invalidSelector != nil {
return fmt.Errorf("HPA reported invalid selector: %s (%s)", invalidSelector.Reason, invalidSelector.Message)
}

ableToScale := findHPACondition(current.Status.Conditions, autoscalingv2.AbleToScale)
if ableToScale == nil {
return fmt.Errorf("HPA has not reported the %s condition yet", autoscalingv2.AbleToScale)
}

if ableToScale.Status != corev1.ConditionTrue {
return fmt.Errorf("expected %s condition to be True, got %s (%s: %s)", ableToScale.Type, ableToScale.Status, ableToScale.Reason, ableToScale.Message)
}

return nil
}, framework.WaitMedium, framework.RetryMedium).Should(Succeed(), "HPA should reconcile without invalid selector failures")
})

// Machines required for test: 1
// Reason: Uses the shared 1-replica MachineSet created in BeforeAll and works on a
// single machine and its node. This runs after the HPA-target test because it mutates the fixture.
It("have ability to additively reconcile taints from machine to nodes", framework.LabelPeriodic, func() {
selector := machineSet.Spec.Selector
selector := sharedMachineSet.Spec.Selector
machines, err := framework.GetMachines(ctx, client, &selector)
Expect(err).ToNot(HaveOccurred(), "Listing Machines should succeed")
Expect(machines).ToNot(BeEmpty(), "The list of Machines should not be empty")
Expand Down