Skip to content

Commit 4e307f0

Browse files
committed
Merge commit 'fe854f2' into support-actortemplate-secret-env
2 parents 8376042 + fe854f2 commit 4e307f0

49 files changed

Lines changed: 2160 additions & 1226 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/pr-workflow.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
name: pr-workflow
1616
on:
1717
pull_request:
18+
push:
19+
branches: [main]
1820
jobs:
1921
run-tests:
2022
runs-on: ubuntu-latest
@@ -25,7 +27,6 @@ jobs:
2527
uses: actions/setup-go@v5
2628
with:
2729
go-version: 1.26
28-
- run: go vet -v ./...
2930
- run: go test -v ./...
3031
- name: verify
3132
run: hack/verify-all.sh

.golangci.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ linters:
2626
# the codebase is clean for the basics.
2727
default: standard
2828

29+
enable:
30+
# Spelling errors in comments, strings, and identifiers.
31+
- misspell
32+
2933
settings:
3034
errcheck:
3135
# Functions where ignoring the return is idiomatic in this codebase.
@@ -48,6 +52,9 @@ linters:
4852
- (k8s.io/client-go/tools/cache.SharedIndexInformer).AddEventHandler
4953
- (k8s.io/client-go/tools/cache.SharedIndexInformer).AddIndexers
5054

55+
misspell:
56+
mode: default
57+
5158
exclusions:
5259
# Built-in presets that suppress the most common idiomatic false
5360
# positives across the standard linters.

AGENTS.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,34 @@ Agent Substrate relies on the fact that agent-like applications tend to be idle
99
For development, it's recommended to read the `README.md` and `CONTRIBUTING.md` in the root folder.
1010
See `hack/install-ate.sh` and `tools/setup-gcp` for provisioning and deploying clusters and GCP resources.
1111

12+
## Repository Layout
13+
14+
```
15+
cmd/ # One subdirectory per binary (ateapi, atelet, atenet, …)
16+
internal/ # Shared packages, internal to this module only
17+
pkg/ # Shared packages intended for external import
18+
docs/ # Design docs and developer guides
19+
hack/ # Dev/CI scripts and code generators
20+
manifests/ # Kubernetes YAML for deploying Agent Substrate
21+
demos/ # Self-contained example applications
22+
benchmarking/ # Load-testing tools and workloads
23+
tools/ # Standalone Go tools (go run ./tools/<name>) for Dev/CI
24+
```
25+
26+
**Where to put new Go code — quick rules:**
27+
28+
| Situation | Location |
29+
|---|---|
30+
| Only used by one binary | `cmd/<binary>/internal/<pkg>` |
31+
| Shared across binaries, not for external import | `internal/<pkg>` |
32+
| Public API for external consumers | `pkg/<pkg>` |
33+
| Public proto (control-plane gRPC API) | `pkg/proto/<name>` |
34+
| Internal proto (atelet / ateom) | `internal/proto/<name>` |
35+
| Dev/CI scripts | `hack/` |
36+
| Standalone Go dev/CI tools | `tools/<name>` with its own `go.mod` |
37+
38+
See `docs/dev/code-layout.md` for the full rationale and per-directory details.
39+
1240
## Build and Test Commands
1341

1442
Agent Substrate uses a `Makefile` for its build and test tasks.

LICENSES/github.com/google/go-cmp/cmp/LICENSE

Lines changed: 0 additions & 27 deletions
This file was deleted.

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ lint:
7777

7878
.PHONY: verify
7979
verify: test
80-
$(GO) vet ./...
8180
bash hack/verify-all.sh
8281

8382
.PHONY: clean

benchmarking/workloads/manifests/full_workloads.yaml.tmpl

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,6 @@ spec:
5757
- name: sleep
5858
image: busybox@sha256:1487d0af5f52b4ba31c7e465126ee2123fe3f2305d638e7827681e7cf6c83d5e
5959
command: ["/bin/sleep", "1000000"]
60-
sessionDiscovery:
61-
mapping:
62-
pathPrefix: "/v1/sleep"
63-
protocol:
64-
strategy: CEL
65-
actorIdentity:
66-
cel: "request.host.split('.')[0]"
6760
workerPoolRef:
6861
namespace: benchmark-workloads
6962
name: benchmark-ateom
@@ -90,13 +83,6 @@ spec:
9083
- name: usermem
9184
image: containerstack/alpine-stress:latest
9285
command: ["stress", "--vm", "1", "--vm-bytes", "1G", "--vm-keep", "--vm-hang", "0"]
93-
sessionDiscovery:
94-
mapping:
95-
pathPrefix: "/v1/usermem"
96-
protocol:
97-
strategy: CEL
98-
actorIdentity:
99-
cel: "request.host.split('.')[0]"
10086
workerPoolRef:
10187
namespace: benchmark-workloads
10288
name: benchmark-ateom
@@ -123,13 +109,6 @@ spec:
123109
- name: kernelmem
124110
image: containerstack/alpine-stress:latest
125111
command: ["sh", "-c", "find /; for i in $(seq 50000); do sleep 10000 & done; sleep 10000"]
126-
sessionDiscovery:
127-
mapping:
128-
pathPrefix: "/v1/kernelmem"
129-
protocol:
130-
strategy: CEL
131-
actorIdentity:
132-
cel: "request.host.split('.')[0]"
133112
workerPoolRef:
134113
namespace: benchmark-workloads
135114
name: benchmark-ateom

benchmarking/workloads/manifests/workloads.yaml.tmpl

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,6 @@ spec:
4848
- name: sleep
4949
image: busybox@sha256:1487d0af5f52b4ba31c7e465126ee2123fe3f2305d638e7827681e7cf6c83d5e
5050
command: ["/bin/sleep", "1000000"]
51-
sessionDiscovery:
52-
mapping:
53-
pathPrefix: "/v1/sleep"
54-
protocol:
55-
strategy: CEL
56-
actorIdentity:
57-
cel: "request.host.split('.')[0]"
5851
workerPoolRef:
5952
namespace: benchmark-workloads
6053
name: benchmark-ateom

cmd/ateapi/internal/controlapi/syncer.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ func (s *WorkerPoolSyncer) Start(ctx context.Context) {
6868
return
6969
}
7070
slog.InfoContext(ctx, "Syncer: removing worker from store", slog.String("worker", pod.Namespace+"/"+pod.Name))
71+
if err := s.releaseActorOnDeadWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name); err != nil {
72+
slog.ErrorContext(ctx, "Failed to release actor bound to deleted worker", slog.Any("err", err))
73+
}
7174
err := s.persistence.DeleteWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name)
7275
if err != nil {
7376
slog.ErrorContext(ctx, "Failed to delete worker from store during delete event", slog.Any("err", err))
@@ -97,6 +100,9 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po
97100

98101
if pod.DeletionTimestamp != nil {
99102
slog.InfoContext(ctx, "Syncer: removing worker from store (pod deleting)", slog.String("worker", pod.Namespace+"/"+pod.Name))
103+
if err := s.releaseActorOnDeadWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name); err != nil {
104+
slog.ErrorContext(ctx, "Failed to release actor bound to soft-deleting worker", slog.Any("err", err))
105+
}
100106
err := s.persistence.DeleteWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name)
101107
if err != nil {
102108
slog.ErrorContext(ctx, "Failed to delete worker from store during update event (deleting)", slog.Any("err", err))
@@ -136,3 +142,50 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po
136142
func isWorkerEligible(pod *corev1.Pod) bool {
137143
return pod.Status.PodIP != ""
138144
}
145+
146+
// releaseActorOnDeadWorker resets the actor bound to a vanishing worker
147+
// pod back to STATUS_SUSPENDED so the next request reassigns it.
148+
//
149+
// UpdateActor uses optimistic version checking. A concurrent SuspendActor
150+
// or ResumeActor wins; we drop this attempt silently.
151+
//
152+
// Best-effort only. The caller always proceeds to DeleteWorker after this
153+
// returns, so any non-contention failure leaves the actor stranded
154+
// (STATUS_RUNNING, pointer at a pod that no longer exists). Recovery
155+
// then needs a manual SuspendActor.
156+
//
157+
// The long-term fix is a finalizer-based controller that holds the pod
158+
// in Terminating state until the actor is gracefully suspended. Tracked
159+
// in https://github.com/agent-substrate/substrate/issues/23.
160+
func (s *WorkerPoolSyncer) releaseActorOnDeadWorker(ctx context.Context, namespace, pool, podName string) error {
161+
worker, err := s.persistence.GetWorker(ctx, namespace, pool, podName)
162+
if err != nil {
163+
if errors.Is(err, store.ErrNotFound) {
164+
return nil
165+
}
166+
return err
167+
}
168+
if worker.GetActorId() == "" {
169+
return nil
170+
}
171+
actor, err := s.persistence.GetActor(ctx, worker.GetActorId())
172+
if err != nil {
173+
if errors.Is(err, store.ErrNotFound) {
174+
return nil
175+
}
176+
return err
177+
}
178+
// Skip if a concurrent SuspendActor already cleared the pointer.
179+
if actor.GetAteomPodNamespace() != namespace || actor.GetAteomPodName() != podName {
180+
return nil
181+
}
182+
actor.Status = ateapipb.Actor_STATUS_SUSPENDED
183+
actor.AteomPodNamespace = ""
184+
actor.AteomPodName = ""
185+
actor.AteomPodIp = ""
186+
actor.InProgressSnapshot = ""
187+
if err := s.persistence.UpdateActor(ctx, actor, actor.GetVersion()); err != nil && !errors.Is(err, store.ErrPersistenceRetry) {
188+
return err
189+
}
190+
return nil
191+
}

cmd/ateapi/internal/controlapi/syncer_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/agent-substrate/substrate/cmd/ateapi/internal/store"
2525
"github.com/agent-substrate/substrate/cmd/ateapi/internal/store/storetest"
26+
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
2627
corev1 "k8s.io/api/core/v1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/util/wait"
@@ -154,3 +155,62 @@ func TestSyncer_Lifecycle(t *testing.T) {
154155
t.Fatalf("Worker still found in Redis after deletion: %v", err)
155156
}
156157
}
158+
159+
func TestSyncer_DeleteBoundWorker_ClearsActor(t *testing.T) {
160+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
161+
defer cancel()
162+
163+
persistence, fakeK8s, cleanup := setupSyncerTest(t, ctx)
164+
defer cleanup()
165+
166+
ns, pool, pod, ip := "ns-orphan", "pool1", "worker-orphan", "10.0.0.1"
167+
if _, err := fakeK8s.CoreV1().Pods(ns).Create(ctx, &corev1.Pod{
168+
ObjectMeta: metav1.ObjectMeta{Name: pod, Namespace: ns,
169+
Labels: map[string]string{workerPodLabel: pool}},
170+
Status: corev1.PodStatus{Phase: corev1.PodRunning, PodIP: ip,
171+
PodIPs: []corev1.PodIP{{IP: ip}}},
172+
}, metav1.CreateOptions{}); err != nil {
173+
t.Fatalf("create pod: %v", err)
174+
}
175+
if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) {
176+
_, gerr := persistence.GetWorker(c, ns, pool, pod)
177+
return gerr == nil, nil
178+
}); err != nil {
179+
t.Fatalf("worker row not materialised: %v", err)
180+
}
181+
actorID := "actor-orphan"
182+
if err := persistence.CreateActor(ctx, &ateapipb.Actor{
183+
ActorId: actorID, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl",
184+
Status: ateapipb.Actor_STATUS_RUNNING,
185+
AteomPodNamespace: ns, AteomPodName: pod, AteomPodIp: ip,
186+
LastSnapshot: "gs://snapshots/last", InProgressSnapshot: "gs://snapshots/partial",
187+
}); err != nil {
188+
t.Fatalf("create actor: %v", err)
189+
}
190+
w, _ := persistence.GetWorker(ctx, ns, pool, pod)
191+
w.ActorId, w.ActorNamespace, w.ActorTemplate = actorID, ns, "tmpl"
192+
if err := persistence.UpdateWorker(ctx, w, w.Version); err != nil {
193+
t.Fatalf("update worker: %v", err)
194+
}
195+
196+
if err := fakeK8s.CoreV1().Pods(ns).Delete(ctx, pod, metav1.DeleteOptions{}); err != nil {
197+
t.Fatalf("delete pod: %v", err)
198+
}
199+
var got *ateapipb.Actor
200+
if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) {
201+
a, gerr := persistence.GetActor(c, actorID)
202+
if gerr != nil {
203+
return false, gerr
204+
}
205+
got = a
206+
return a.GetStatus() == ateapipb.Actor_STATUS_SUSPENDED, nil
207+
}); err != nil {
208+
t.Fatalf("actor not reset to SUSPENDED: %v", err)
209+
}
210+
if got.AteomPodName != "" || got.AteomPodNamespace != "" || got.AteomPodIp != "" || got.InProgressSnapshot != "" {
211+
t.Errorf("bind fields not cleared: %+v", got)
212+
}
213+
if got.LastSnapshot == "" {
214+
t.Errorf("LastSnapshot must be preserved")
215+
}
216+
}

0 commit comments

Comments
 (0)