Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 269 additions & 15 deletions cmd/ateapi/internal/controlapi/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -146,24 +147,30 @@ type FakeAteletServer struct {

Lock sync.Mutex

RunCalled bool
RunCalled bool
RunRequest *ateletpb.RunRequest

CheckpointCalled bool
CheckpointCalled bool
CheckpointRequest *ateletpb.CheckpointRequest

RestoreCalled bool
FailRestore error
RestoreDelay time.Duration
RestoreCalled bool
RestoreRequest *ateletpb.RestoreRequest
FailRestore error
RestoreDelay time.Duration
}

func (f *FakeAteletServer) Reset() {
f.Lock.Lock()
defer f.Lock.Unlock()

f.RunCalled = false
f.RunRequest = nil

f.CheckpointCalled = false
f.CheckpointRequest = nil

f.RestoreCalled = false
f.RestoreRequest = nil
f.FailRestore = nil
f.RestoreDelay = 0
}
Expand All @@ -173,6 +180,7 @@ func (f *FakeAteletServer) Run(ctx context.Context, req *ateletpb.RunRequest) (*
defer f.Lock.Unlock()

f.RunCalled = true
f.RunRequest = proto.Clone(req).(*ateletpb.RunRequest)

return &ateletpb.RunResponse{}, nil
}
Expand All @@ -182,6 +190,7 @@ func (f *FakeAteletServer) Checkpoint(ctx context.Context, req *ateletpb.Checkpo
defer f.Lock.Unlock()

f.CheckpointCalled = true
f.CheckpointRequest = proto.Clone(req).(*ateletpb.CheckpointRequest)

return &ateletpb.CheckpointResponse{}, nil
}
Expand All @@ -191,6 +200,7 @@ func (f *FakeAteletServer) Restore(ctx context.Context, req *ateletpb.RestoreReq
defer f.Lock.Unlock()

f.RestoreCalled = true
f.RestoreRequest = proto.Clone(req).(*ateletpb.RestoreRequest)
if f.RestoreDelay > 0 {
time.Sleep(f.RestoreDelay)
}
Expand All @@ -200,6 +210,16 @@ func (f *FakeAteletServer) Restore(ctx context.Context, req *ateletpb.RestoreReq
return &ateletpb.RestoreResponse{}, nil
}

func (f *FakeAteletServer) lastRestoreRequest() *ateletpb.RestoreRequest {
f.Lock.Lock()
defer f.Lock.Unlock()

if f.RestoreRequest == nil {
return nil
}
return proto.Clone(f.RestoreRequest).(*ateletpb.RestoreRequest)
}

type testContext struct {
mr *miniredis.Miniredis
service *Service
Expand Down Expand Up @@ -261,7 +281,7 @@ func setupTest(t *testing.T, ns string) *testContext {

// 4. Initialize Service
dialer := NewAteletDialer(workerInformer.GetIndexer(), ateletInformer.GetIndexer())
service := NewService(persistence, actorTemplateLister, dialer)
service := NewService(persistence, actorTemplateLister, dialer, k8sClient)

// 5. Start REAL gRPC Server for ATE API
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(ateinterceptors.ServerUnaryInterceptor))
Expand Down Expand Up @@ -330,6 +350,17 @@ func namespaceForTest(baseName string) string {
}

func createTemplate(t *testing.T, tc *testContext, ns string) {
t.Helper()
createTemplateWithContainers(t, tc, ns, []atev1alpha1.Container{
{
Name: "main",
Image: "main@sha256:abc",
Command: []string{"/main"},
},
})
}

func createTemplateWithContainers(t *testing.T, tc *testContext, ns string, containers []atev1alpha1.Container) {
t.Helper()
actorTemplate := &atev1alpha1.ActorTemplate{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -344,13 +375,7 @@ func createTemplate(t *testing.T, tc *testContext, ns string) {
},
},
PauseImage: "pause@sha256:abc",
Containers: []atev1alpha1.Container{
{
Name: "main",
Image: "main@sha256:abc",
Command: []string{"/main"},
},
},
Containers: containers,
WorkerPoolRef: corev1.ObjectReference{
Namespace: ns,
Name: "pool1",
Expand Down Expand Up @@ -733,7 +758,7 @@ func TestResumeActor(t *testing.T) {
AteomPodIp: "127.0.0.1",
},
}
if diff := cmp.Diff(want, getResp, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Actor{}, "version")); diff != "" {
if diff := cmp.Diff(want, getResp, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Actor{}, "version", "last_resolved_env_sha256", "last_resolved_env_at")); diff != "" {
t.Errorf("GetActor response mismatch (-want +got):\n%s", diff)
}

Expand Down Expand Up @@ -768,6 +793,235 @@ func TestResumeActor(t *testing.T) {
}
}

func TestResumeActorResolvesValueFromEnv(t *testing.T) {
ns := namespaceForTest("ns-resume-secret-env")
tc := setupTest(t, ns)
defer tc.cleanup()

_, err := tc.k8sClient.CoreV1().ConfigMaps(ns).Create(context.Background(), &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "settings",
Namespace: ns,
},
Data: map[string]string{
"interval": "45",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create ConfigMap: %v", err)
}

_, err = tc.k8sClient.CoreV1().Secrets(ns).Create(context.Background(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "api-keys",
Namespace: ns,
},
Data: map[string][]byte{
"anthropic": []byte("sk-test"),
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create secret: %v", err)
}

createTemplateWithContainers(t, tc, ns, []atev1alpha1.Container{
{
Name: "main",
Image: "main@sha256:abc",
Command: []string{"/main"},
Env: []atev1alpha1.EnvVar{
{
Name: "LITERAL",
Value: "plain",
},
{
Name: "INTERVAL_SECONDS",
ValueFrom: &atev1alpha1.EnvVarSource{
ConfigMapKeyRef: &atev1alpha1.ConfigMapKeySelector{
Name: "settings",
Key: "interval",
},
},
},
{
Name: "ANTHROPIC_API_KEY",
ValueFrom: &atev1alpha1.EnvVarSource{
SecretKeyRef: &atev1alpha1.SecretKeySelector{
Name: "api-keys",
Key: "anthropic",
},
},
},
},
},
})
createWorkerPod(t, tc, ns, "worker-1", "node1")

_, err = tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{
ActorTemplateNamespace: ns,
ActorTemplateName: "tmpl1",
ActorId: "id1",
})
if err != nil {
t.Fatalf("CreateActor failed: %v", err)
}
_, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("ResumeActor failed: %v", err)
}

restoreReq := tc.fakeAtelet.lastRestoreRequest()
if restoreReq == nil {
t.Fatalf("expected Restore to be called")
}
if len(restoreReq.GetSpec().GetContainers()) != 1 {
t.Fatalf("expected one container in restore request, got %d", len(restoreReq.GetSpec().GetContainers()))
}
gotEnv := map[string]string{}
for _, env := range restoreReq.GetSpec().GetContainers()[0].GetEnv() {
gotEnv[env.GetName()] = env.GetValue()
}
wantEnv := map[string]string{
"LITERAL": "plain",
"INTERVAL_SECONDS": "45",
"ANTHROPIC_API_KEY": "sk-test",
}
if diff := cmp.Diff(wantEnv, gotEnv); diff != "" {
t.Errorf("resolved env mismatch (-want +got):\n%s", diff)
}

getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("GetActor failed: %v", err)
}
if got, want := getResp.GetActor().GetLastResolvedEnvSha256(), workloadSpecEnvSHA256(restoreReq.GetSpec()); got != want {
t.Errorf("last_resolved_env_sha256 = %q, want %q", got, want)
}
if getResp.GetActor().GetLastResolvedEnvAt() == nil {
t.Errorf("expected last_resolved_env_at to be set")
}
}

func TestResumeActorEnvHashChangeInvalidatesSnapshot(t *testing.T) {
ns := namespaceForTest("ns-resume-env-drift")
tc := setupTest(t, ns)
defer tc.cleanup()

_, err := tc.k8sClient.CoreV1().ConfigMaps(ns).Create(context.Background(), &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "settings",
Namespace: ns,
},
Data: map[string]string{
"interval": "45",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create ConfigMap: %v", err)
}

createTemplateWithContainers(t, tc, ns, []atev1alpha1.Container{
{
Name: "main",
Image: "main@sha256:abc",
Command: []string{"/main"},
Env: []atev1alpha1.EnvVar{
{
Name: "INTERVAL_SECONDS",
ValueFrom: &atev1alpha1.EnvVarSource{
ConfigMapKeyRef: &atev1alpha1.ConfigMapKeySelector{
Name: "settings",
Key: "interval",
},
},
},
},
},
})
createWorkerPod(t, tc, ns, "worker-1", "node1")

_, err = tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{
ActorTemplateNamespace: ns,
ActorTemplateName: "tmpl1",
ActorId: "id1",
})
if err != nil {
t.Fatalf("CreateActor failed: %v", err)
}
_, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("initial ResumeActor failed: %v", err)
}
firstGet, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("GetActor failed: %v", err)
}
firstEnvHash := firstGet.GetActor().GetLastResolvedEnvSha256()
if firstEnvHash == "" {
t.Fatalf("expected first resume to record an env hash")
}

_, err = tc.client.SuspendActor(context.Background(), &ateapipb.SuspendActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("SuspendActor failed: %v", err)
}
suspended, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("GetActor after suspend failed: %v", err)
}
if suspended.GetActor().GetLastSnapshot() == "" {
t.Fatalf("expected suspend to record a snapshot")
}

cm, err := tc.k8sClient.CoreV1().ConfigMaps(ns).Get(context.Background(), "settings", metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get ConfigMap: %v", err)
}
cm.Data["interval"] = "90"
if _, err := tc.k8sClient.CoreV1().ConfigMaps(ns).Update(context.Background(), cm, metav1.UpdateOptions{}); err != nil {
t.Fatalf("failed to update ConfigMap: %v", err)
}

tc.fakeAtelet.Reset()
_, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("second ResumeActor failed: %v", err)
}
if !tc.fakeAtelet.RunCalled {
t.Fatalf("expected env hash change to boot from spec")
}
if tc.fakeAtelet.RestoreCalled {
t.Fatalf("expected env hash change to skip snapshot restore")
}

resumed, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("GetActor after second resume failed: %v", err)
}
if resumed.GetActor().GetLastResolvedEnvSha256() == firstEnvHash {
t.Fatalf("expected env hash to change after ConfigMap update")
}
if resumed.GetActor().GetLastSnapshot() != "" {
t.Fatalf("expected invalidated snapshot reference to be cleared, got %q", resumed.GetActor().GetLastSnapshot())
}
}

// TestResumeActor_NoWorkers tests that resuming an actor fails when no free workers are available.
// Workflow:
// 1. Creates a mock ActorTemplate.
Expand Down Expand Up @@ -943,7 +1197,7 @@ func TestSuspendActor(t *testing.T) {
},
}

if diff := cmp.Diff(want, getResp, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Actor{}, "version"), protocmp.IgnoreFields(&ateapipb.Actor{}, "last_snapshot")); diff != "" {
if diff := cmp.Diff(want, getResp, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Actor{}, "version", "last_snapshot", "last_resolved_env_sha256", "last_resolved_env_at")); diff != "" {
t.Errorf("GetActor response mismatch (-want +got):\n%s", diff)
}

Expand Down
Loading
Loading