Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/ateapi/internal/controlapi/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func TestListWorkers(t *testing.T) {
},
}

if diff := cmp.Diff(want, filteredWorkers, protocmp.Transform()); diff != "" {
if diff := cmp.Diff(want, filteredWorkers, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Worker{}, "worker_pod_uid")); diff != "" {
t.Errorf("ListWorkers response mismatch (-want +got):\n%s", diff)
}
}
Expand Down Expand Up @@ -733,7 +733,7 @@ func TestResumeActor(t *testing.T) {
AteomPodIp: "127.0.0.1",
},
}
if diff := cmp.Diff(want, getResp, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Actor{}, "version")); diff != "" {
if diff := cmp.Diff(want, getResp, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Actor{}, "version"), protocmp.IgnoreFields(&ateapipb.Actor{}, "ateom_pod_uid")); diff != "" {
t.Errorf("GetActor response mismatch (-want +got):\n%s", diff)
}

Expand Down Expand Up @@ -763,7 +763,7 @@ func TestResumeActor(t *testing.T) {
Ip: "127.0.0.1",
}

if diff := cmp.Diff(wantWorker, actorWorker, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Worker{}, "version")); diff != "" {
if diff := cmp.Diff(wantWorker, actorWorker, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Worker{}, "version"), protocmp.IgnoreFields(&ateapipb.Worker{}, "worker_pod_uid")); diff != "" {
t.Errorf("Worker state mismatch (-want +got):\n%s", diff)
}
}
Expand Down Expand Up @@ -943,7 +943,7 @@ func TestSuspendActor(t *testing.T) {
},
}

if diff := cmp.Diff(want, getResp, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Actor{}, "version"), protocmp.IgnoreFields(&ateapipb.Actor{}, "last_snapshot")); diff != "" {
if diff := cmp.Diff(want, getResp, protocmp.Transform(), protocmp.IgnoreFields(&ateapipb.Actor{}, "version"), protocmp.IgnoreFields(&ateapipb.Actor{}, "last_snapshot"), protocmp.IgnoreFields(&ateapipb.Actor{}, "ateom_pod_uid")); diff != "" {
t.Errorf("GetActor response mismatch (-want +got):\n%s", diff)
}

Expand Down
1 change: 1 addition & 0 deletions cmd/ateapi/internal/controlapi/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po
WorkerPool: pod.Labels[workerPodLabel],
WorkerPod: pod.Name,
Ip: pod.Status.PodIP,
WorkerPodUid: string(pod.UID),
})
if err != nil && !errors.Is(err, store.ErrAlreadyExists) {
slog.ErrorContext(ctx, "Failed to create worker in store", slog.Any("err", err))
Expand Down
10 changes: 4 additions & 6 deletions cmd/ateapi/internal/controlapi/workflow_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, stat
state.Actor.AteomPodNamespace = assignedWorker.GetWorkerNamespace()
state.Actor.AteomPodName = assignedWorker.GetWorkerPod()
state.Actor.AteomPodIp = assignedWorker.GetIp()
state.Actor.AteomPodUid = assignedWorker.GetWorkerPodUid()

if err := s.store.UpdateActor(ctx, state.Actor, state.Actor.GetVersion()); err != nil {
return err
Expand Down Expand Up @@ -213,8 +214,7 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput,
slog.InfoContext(ctx, "Actor has snapshot; Restoring from snapshot")

req := &ateletpb.RestoreRequest{
TargetAteomNamespace: state.Actor.GetAteomPodNamespace(),
TargetAteomName: state.Actor.GetAteomPodName(),
TargetAteomUid: state.Actor.GetAteomPodUid(),
ActorTemplateNamespace: state.Actor.GetActorTemplateNamespace(),
ActorTemplateName: state.Actor.GetActorTemplateName(),
ActorId: state.Actor.GetActorId(),
Expand All @@ -233,8 +233,7 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput,
snapshot := state.ActorTemplate.Status.GoldenSnapshot

req := &ateletpb.RestoreRequest{
TargetAteomNamespace: state.Actor.GetAteomPodNamespace(),
TargetAteomName: state.Actor.GetAteomPodName(),
TargetAteomUid: state.Actor.GetAteomPodUid(),
ActorTemplateNamespace: state.Actor.GetActorTemplateNamespace(),
ActorTemplateName: state.Actor.GetActorTemplateName(),
ActorId: state.Actor.GetActorId(),
Expand All @@ -250,8 +249,7 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput,
} else {
slog.InfoContext(ctx, "Actor has no snapshot; ActorTemplate has no golden snapshot; Booting from ActorTemplate spec")
req := &ateletpb.RunRequest{
TargetAteomNamespace: state.Actor.GetAteomPodNamespace(),
TargetAteomName: state.Actor.GetAteomPodName(),
TargetAteomUid: state.Actor.GetAteomPodUid(),
ActorTemplateNamespace: state.Actor.GetActorTemplateNamespace(),
ActorTemplateName: state.Actor.GetActorTemplateName(),
ActorId: state.Actor.GetActorId(),
Expand Down
3 changes: 1 addition & 2 deletions cmd/ateapi/internal/controlapi/workflow_suspend.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ func (s *CallAteletSuspendStep) Execute(ctx context.Context, input *SuspendInput
}

req := &ateletpb.CheckpointRequest{
TargetAteomNamespace: state.Actor.GetAteomPodNamespace(),
TargetAteomName: state.Actor.GetAteomPodName(),
TargetAteomUid: state.Actor.GetAteomPodUid(),
ActorTemplateNamespace: state.Actor.GetActorTemplateNamespace(),
ActorTemplateName: state.Actor.GetActorTemplateName(),
ActorId: state.Actor.GetActorId(),
Expand Down
26 changes: 13 additions & 13 deletions cmd/atelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,12 @@ func (s *AteomHerder) Run(ctx context.Context, req *ateletpb.RunRequest) (*atele

if err := s.prepareOCIBundles(ctx,
req.GetActorTemplateNamespace(), req.GetActorTemplateName(), req.GetActorId(),
req.GetSpec(), req.GetTargetAteomNamespace(), req.GetTargetAteomName(),
req.GetSpec(), req.GetTargetAteomUid(),
); err != nil {
return nil, err
}

client, err := s.dialAteom(ctx, req.GetTargetAteomNamespace(), req.GetTargetAteomName())
client, err := s.dialAteom(ctx, req.GetTargetAteomUid())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func (s *AteomHerder) Checkpoint(ctx context.Context, req *ateletpb.CheckpointRe

checkpointDir := ateompath.CheckpointDir(req.GetActorTemplateNamespace(), req.GetActorTemplateName(), req.GetActorId())

client, err := s.dialAteom(ctx, req.GetTargetAteomNamespace(), req.GetTargetAteomName())
client, err := s.dialAteom(ctx, req.GetTargetAteomUid())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -397,12 +397,12 @@ func (s *AteomHerder) Restore(ctx context.Context, req *ateletpb.RestoreRequest)
}

if err := s.prepareOCIBundles(ctx, ns, tmpl, actorID,
req.GetSpec(), req.GetTargetAteomNamespace(), req.GetTargetAteomName(),
req.GetSpec(), req.GetTargetAteomUid(),
); err != nil {
return nil, err
}

client, err := s.dialAteom(ctx, req.GetTargetAteomNamespace(), req.GetTargetAteomName())
client, err := s.dialAteom(ctx, req.GetTargetAteomUid())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -442,9 +442,9 @@ func (s *AteomHerder) prepareOCIBundles(
ctx context.Context,
actorTemplateNamespace, actorTemplateName, actorID string,
spec *ateletpb.WorkloadSpec,
targetAteomNamespace, targetAteomName string,
targetAteomUid string,
) error {
netnsPath := ateompath.AteomNetNSPath(targetAteomNamespace, targetAteomName)
netnsPath := ateompath.AteomNetNSPath(targetAteomUid)

g, gCtx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -503,10 +503,10 @@ func (s *AteomHerder) prepareOCIBundles(

// dialAteom opens (or reuses) the gRPC connection to the target ateom
// pod and returns an ateom client.
func (s *AteomHerder) dialAteom(ctx context.Context, namespace, name string) (ateompb.AteomClient, error) {
conn, err := s.ateomDialer.DialAteomPod(ctx, namespace, name)
func (s *AteomHerder) dialAteom(ctx context.Context, targetAteomUid string) (ateompb.AteomClient, error) {
conn, err := s.ateomDialer.DialAteomPod(ctx, targetAteomUid)
if err != nil {
return nil, fmt.Errorf("while getting ateom conn for %s/%s: %w", namespace, name, err)
return nil, fmt.Errorf("while getting ateom conn for %s: %w", targetAteomUid, err)
}
return ateompb.NewAteomClient(conn), nil
}
Expand Down Expand Up @@ -538,16 +538,16 @@ type AteomDialer struct {
conns *lru.Cache
}

func (d *AteomDialer) DialAteomPod(ctx context.Context, namespace, name string) (*grpc.ClientConn, error) {
key := namespace + "/" + name
func (d *AteomDialer) DialAteomPod(ctx context.Context, podUID string) (*grpc.ClientConn, error) {
key := podUID

connAny, ok := d.conns.Get(key)
if ok {
return connAny.(*grpc.ClientConn), nil
}

conn, err := grpc.NewClient(
"unix://"+ateompath.AteomSocketPath(namespace, name),
"unix://"+ateompath.AteomSocketPath(podUID),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
Expand Down
15 changes: 4 additions & 11 deletions cmd/ateom-gvisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ import (
)

var (
podNamespace = flag.String("pod-namespace", "", "The namespace of the current pod")
podName = flag.String("pod-name", "", "The name of the current pod")
podUID = flag.String("pod-uid", "", "The UID of the current pod")

reapLock sync.RWMutex
)
Expand Down Expand Up @@ -82,7 +81,7 @@ func do(ctx context.Context) error {
defer serverboot.ShutdownProvider("TracerProvider", tp.Shutdown)

// Create ateom dir
ateomDir := ateompath.AteomPath(*podNamespace, *podName)
ateomDir := ateompath.AteomPath(*podUID)
if err := os.MkdirAll(ateomDir, 0o700); err != nil {
return fmt.Errorf("in os.MkdirAll(%q): %w", ateomDir, err)
}
Expand All @@ -94,14 +93,8 @@ func do(ctx context.Context) error {
go reap.ReapChildren(nil, nil, nil, &reapLock)
slog.InfoContext(ctx, "Child process reaper launched")

// Validate before opening the socket so the operator sees a clear
// message rather than the kernel's cryptic "bind: invalid argument".
if err := ateompath.ValidateAteomSocketPath(*podNamespace, *podName); err != nil {
return err
}

// Clean up any old socket.
sockPath := ateompath.AteomSocketPath(*podNamespace, *podName)
sockPath := ateompath.AteomSocketPath(*podUID)
if err := os.RemoveAll(sockPath); err != nil {
return fmt.Errorf("while removing %q: %w", sockPath, err)
}
Expand Down Expand Up @@ -130,7 +123,7 @@ func do(ctx context.Context) error {
// read the addresses and routes off of every link in the namespace, then
// remove all the addresses and handle injecting packets into the interfaces
// using AF_PACKET.
interiorNetNS, err := createNetNSWithoutSwitching(ctx, ateompath.AteomNetNSName(*podNamespace, *podName))
interiorNetNS, err := createNetNSWithoutSwitching(ctx, ateompath.AteomNetNSName(*podUID))
if err != nil {
return fmt.Errorf("while creating ateom-interior netns: %w", err)
}
Expand Down
38 changes: 8 additions & 30 deletions internal/ateompath/ateompath.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@
package ateompath

import (
"fmt"
"path/filepath"
)

// MaxUnixSocketPathLen is the practical Linux limit for unix-domain socket
// paths. The kernel's sockaddr_un.sun_path is 108 bytes including the trailing
// NUL, leaving 107 usable bytes. Bind fails with EINVAL above this.
const MaxUnixSocketPathLen = 107

const (
// The base path. This is both the path of the root shared folder on the
// host filesystem, and when it is mounted into ateom and atelet containers.
Expand All @@ -40,45 +34,29 @@ func RunSCBinaryPath(sha256 string) string {
return filepath.Join(StaticFilesDir, "runsc-"+sha256)
}

func AteomPath(ateomNamespace, ateomName string) string {
func AteomPath(podUID string) string {
return filepath.Join(
BasePath,
"ateoms",
ateomNamespace+":"+ateomName,
podUID,
)
}

func AteomSocketPath(ateomNamespace, ateomName string) string {
func AteomSocketPath(podUID string) string {
return filepath.Join(
AteomPath(ateomNamespace, ateomName),
AteomPath(podUID),
"ateom.sock",
)
}

// ValidateAteomSocketPath returns a descriptive error when the socket path
// derived from ateomNamespace and ateomName would exceed Linux's unix-socket
// limit. Calling net.Listen("unix", ...) with an over-limit path otherwise
// fails with the cryptic "bind: invalid argument".
func ValidateAteomSocketPath(ateomNamespace, ateomName string) error {
p := AteomSocketPath(ateomNamespace, ateomName)
if len(p) > MaxUnixSocketPathLen {
return fmt.Errorf(
"ateom socket path %q is %d bytes, exceeds Linux unix-socket limit of %d: shorten the namespace or pod name (%d + %d = %d chars used for namespace + name)",
p, len(p), MaxUnixSocketPathLen,
len(ateomNamespace), len(ateomName), len(ateomNamespace)+len(ateomName),
)
}
return nil
}

func AteomNetNSName(ateomNamespace, ateomName string) string {
return "ateom:" + ateomNamespace + ":" + ateomName
func AteomNetNSName(podUID string) string {
return "ateom:" + podUID
}

func AteomNetNSPath(ateomNamespace, ateomName string) string {
func AteomNetNSPath(podUID string) string {
return filepath.Join(
"/run/netns",
AteomNetNSName(ateomNamespace, ateomName),
AteomNetNSName(podUID),
)
}

Expand Down
83 changes: 36 additions & 47 deletions internal/ateompath/ateompath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,42 @@ import (
"testing"
)

func TestValidateAteomSocketPath(t *testing.T) {
tests := []struct {
name string
namespace string
podName string
wantErr bool
}{
{
name: "short names well under the limit",
namespace: "ate-demo-counter",
podName: "counter-deployment-abcd1234-xyzw1",
wantErr: false,
},
{
name: "exactly at the limit",
namespace: strings.Repeat("a", 25),
podName: strings.Repeat("b", 45),
wantErr: false,
},
{
name: "one byte over the limit",
namespace: strings.Repeat("a", 25),
podName: strings.Repeat("b", 46),
wantErr: true,
},
{
name: "the reproducer from the original bug report",
namespace: "ate-demo-lovable-sandbox",
podName: "lovable-sandbox-pool-deployment-5797879cd7-2n7wb",
wantErr: true,
},
func TestAteomPath(t *testing.T) {
podUID := "123e4567-e89b-12d3-a456-426614174000"

path := AteomPath(podUID)
expectedSuffix := "/ateoms/" + podUID
if !strings.HasSuffix(path, expectedSuffix) {
t.Errorf("expected path to end with %s, got %s", expectedSuffix, path)
}
}

func TestAteomSocketPathLimits(t *testing.T) {
podUID := "123e4567-e89b-12d3-a456-426614174000"

sockPath := AteomSocketPath(podUID)

// Unix domain socket path limit is 107 bytes (108 with NUL terminator)
const maxUnixSocketLen = 107
if len(sockPath) > maxUnixSocketLen {
t.Errorf("socket path length %d exceeds max allowed length %d: %q", len(sockPath), maxUnixSocketLen, sockPath)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateAteomSocketPath(tt.namespace, tt.podName)
if (err != nil) != tt.wantErr {
t.Errorf("ValidateAteomSocketPath(%q, %q) err=%v, wantErr=%v",
tt.namespace, tt.podName, err, tt.wantErr)
}
if tt.wantErr && err != nil {
// Error message should mention the limit so an operator can
// figure out by how much to shorten.
msg := err.Error()
if !strings.Contains(msg, "107") {
t.Errorf("error message %q does not reference the limit (107)", msg)
}
}
})

// Verify it is deterministic
sockPath2 := AteomSocketPath(podUID)
if sockPath != sockPath2 {
t.Errorf("expected deterministic socket paths, got %q and %q", sockPath, sockPath2)
}
}

func TestAteomPathUniqueness(t *testing.T) {
uid1 := "123e4567-e89b-12d3-a456-426614174000"
uid2 := "987f6543-e21b-32d1-b654-246614174111"

path1 := AteomPath(uid1)
path2 := AteomPath(uid2)

if path1 == path2 {
t.Errorf("expected different paths for different pod UIDs, got %q", path1)
}
}
Loading
Loading