Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ linters:
enable:
# Spelling errors in comments, strings, and identifiers.
- misspell
# Reconciliation and RPC calls should inherit the caller's context.
- contextcheck
# High-signal bug-pattern checks without enabling style-only churn.
- gocritic
# CRD phase/state-machine switches should handle every declared state.
- exhaustive

settings:
errcheck:
Expand All @@ -55,6 +61,41 @@ linters:
misspell:
mode: default

gocritic:
disable-all: true
enabled-checks:
- appendAssign
- argOrder
- badCall
- badCond
- badLock
- badRegexp
- badSorting
- badSyncOnceFunc
- caseOrder
- dupArg
- dupBranchBody
- dupCase
- dupSubExpr
- dynamicFmtString
- exitAfterDefer
- mapKey
- nilValReturn
- offBy1
- regexpMust
- sloppyLen
- sloppyReassign
- sloppyTypeAssert
- truncateCmp
- uncheckedInlineErr

exhaustive:
check:
- switch
- map
default-signifies-exhaustive: false
package-scope-only: true

exclusions:
# Built-in presets that suppress the most common idiomatic false
# positives across the standard linters.
Expand All @@ -71,18 +112,22 @@ linters:
rules:
# Generated code: ignore findings entirely.
- path: '\.pb\.go$'
linters: [errcheck, govet, ineffassign, staticcheck, unused]
linters: [errcheck, govet, ineffassign, staticcheck, unused, contextcheck, gocritic, exhaustive]
- path: '\.pb\.gw\.go$'
linters: [errcheck, govet, ineffassign, staticcheck, unused]
linters: [errcheck, govet, ineffassign, staticcheck, unused, contextcheck, gocritic, exhaustive]
- path: 'zz_generated.*\.go$'
linters: [errcheck, govet, ineffassign, staticcheck, unused]
linters: [errcheck, govet, ineffassign, staticcheck, unused, contextcheck, gocritic, exhaustive]
# Kubernetes client-gen / informer-gen / lister-gen output.
- path: '^pkg/client/'
linters: [errcheck, govet, ineffassign, staticcheck, unused]
linters: [errcheck, govet, ineffassign, staticcheck, unused, contextcheck, gocritic, exhaustive]
# Tests routinely discard error returns from setup helpers.
- path: '_test\.go'
linters:
- errcheck
# Demo binaries intentionally favor concise examples over production
# control-flow discipline.
- path: '^demos/'
linters: [gocritic, contextcheck]
# Suppress staticcheck QF* (quickfix) recommendations. They are
# stylistic rewrites rather than bug detectors and would generate a
# large amount of churn if applied indiscriminately.
Expand Down
4 changes: 2 additions & 2 deletions cmd/ateapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func main() {
if err != nil {
serverboot.Fatal(ctx, "Failed to initialize tracing", err)
}
defer serverboot.ShutdownProvider("TracerProvider", tp.Shutdown)
defer serverboot.ShutdownProvider(ctx, "TracerProvider", tp.Shutdown)

mp, err := serverboot.InitMetrics(ctx, "ateapi")
if err != nil {
serverboot.Fatal(ctx, "Failed to initialize metrics", err)
}
defer serverboot.ShutdownProvider("MeterProvider", mp.Shutdown)
defer serverboot.ShutdownProvider(ctx, "MeterProvider", mp.Shutdown)

loadFlagsFromEnv()
logFlagValues(ctx)
Expand Down
4 changes: 2 additions & 2 deletions cmd/atelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ func main() {
if err != nil {
serverboot.Fatal(ctx, "Failed to initialize tracing", err)
}
defer serverboot.ShutdownProvider("TracerProvider", tp.Shutdown)
defer serverboot.ShutdownProvider(ctx, "TracerProvider", tp.Shutdown)

mp, err := serverboot.InitMetrics(ctx, "atelet")
if err != nil {
serverboot.Fatal(ctx, "Failed to initialize metrics", err)
}
defer serverboot.ShutdownProvider("MeterProvider", mp.Shutdown)
defer serverboot.ShutdownProvider(ctx, "MeterProvider", mp.Shutdown)

go serverboot.StartMetricsServer(ctx, serverboot.MetricsServerOptions{Addr: *metricsListenAddr})

Expand Down
5 changes: 4 additions & 1 deletion cmd/atelet/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,15 @@ func untar(ctx context.Context, tarData io.Reader, rootPath string) error {
return fmt.Errorf("while creating file %q: %w", name, err)
}

_, err = io.Copy(outFile, tarReader)
written, err := io.CopyN(outFile, tarReader, hdr.Size)
closeErr := outFile.Close()

if err != nil {
return fmt.Errorf("while writing contents of %q from tar stream: %w", name, err)
}
if written != hdr.Size {
return fmt.Errorf("while writing contents of %q from tar stream: wrote %d bytes, expected %d", name, written, hdr.Size)
}
if closeErr != nil {
return fmt.Errorf("while closing file %q: %w", name, closeErr)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/atenet/internal/app/router/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *Controller) reconcile(ctx context.Context) error {
return err
}

if err := c.xdsSrv.UpdateSnapshot(); err != nil {
if err := c.xdsSrv.UpdateSnapshot(ctx); err != nil {
slog.ErrorContext(ctx, "xDS Configuration generation problem", slog.String("err", err.Error()))
return err
}
Expand Down
16 changes: 15 additions & 1 deletion cmd/atenet/internal/app/router/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,23 @@ func mapResumeError(actorID string, err error) error {

re := &reqError{cause: err}
switch status.Code(err) {
case codes.OK:
return nil
case codes.Canceled:
re.statusCode = 499
re.msg = fmt.Sprintf("actor %q request canceled", actorID)
case codes.Unknown, codes.Internal, codes.DataLoss:
re.statusCode = int(envoy_type.StatusCode_InternalServerError)
re.msg = fmt.Sprintf("error resuming actor %q", actorID)
case codes.InvalidArgument, codes.OutOfRange:
re.statusCode = int(envoy_type.StatusCode_BadRequest)
re.msg = fmt.Sprintf("actor %q request invalid", actorID)
case codes.NotFound:
re.statusCode = int(envoy_type.StatusCode_NotFound)
re.msg = fmt.Sprintf("actor %q not found", actorID)
case codes.AlreadyExists, codes.Aborted:
re.statusCode = int(envoy_type.StatusCode_Conflict)
re.msg = fmt.Sprintf("actor %q request conflicted", actorID)
case codes.FailedPrecondition:
// Preserve the gRPC description for FailedPrecondition only: it carries
// actionable client-facing context (e.g. "no free workers available")
Expand All @@ -84,7 +98,7 @@ func mapResumeError(actorID string, err error) error {
case codes.ResourceExhausted:
re.statusCode = int(envoy_type.StatusCode_TooManyRequests)
re.msg = fmt.Sprintf("actor %q rate limited", actorID)
default:
case codes.Unimplemented:
re.statusCode = int(envoy_type.StatusCode_InternalServerError)
re.msg = fmt.Sprintf("error resuming actor %q", actorID)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/atenet/internal/app/router/resumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewActorResumer(apiClient ateapipb.ControlClient) *ActorResumer {
// ResumeActor ensures the requested actor is running. It deduplicates concurrent
// requests within the process and retries when needed.
func (r *ActorResumer) ResumeActor(ctx context.Context, actorID string) (*ateapipb.Actor, error) {
ch := r.flight.DoChan(actorID, func() (interface{}, error) {
ch := r.flight.DoChan(actorID, func() (interface{}, error) { //nolint:contextcheck // Shared resume work must outlive the first caller's cancellation.
// We detach the context from the first caller using a fixed background timeout.
// This guarantees that if Caller 1 disconnects or times out, the underlying
// resume operation continues running for Caller 2 and Caller 3 without failing.
Expand Down
2 changes: 1 addition & 1 deletion cmd/atenet/internal/app/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *RouterServer) Run(ctx context.Context) error {

g, ctx := errgroup.WithContext(ctx)

xdsSrv := NewXdsServer(s.cfg.XdsPort)
xdsSrv := NewXdsServer(ctx, s.cfg.XdsPort)
xdsSrv.SetConfig(s.cfg.HttpPort, s.cfg.ExtprocPort, s.cfg.ExtprocAddr)

var certContent, keyContent string
Expand Down
10 changes: 5 additions & 5 deletions cmd/atenet/internal/app/router/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ type XdsServer struct {
keyContent string
}

func NewXdsServer(xdsPort int) *XdsServer {
func NewXdsServer(ctx context.Context, xdsPort int) *XdsServer {
cache := cachev3.NewSnapshotCache(true, cachev3.IDHash{}, nil)
srv := serverv3.NewServer(context.Background(), cache, nil)
srv := serverv3.NewServer(ctx, cache, nil)

return &XdsServer{
xdsPort: xdsPort,
Expand Down Expand Up @@ -113,7 +113,7 @@ func (x *XdsServer) SetTlsConfig(httpsPort int, certPath string, certContent str
x.keyContent = keyContent
}

func (x *XdsServer) UpdateSnapshot() error {
func (x *XdsServer) UpdateSnapshot(ctx context.Context) error {
x.mu.Lock()
defer x.mu.Unlock()

Expand Down Expand Up @@ -155,12 +155,12 @@ func (x *XdsServer) UpdateSnapshot() error {
}

slog.Info("Deploying updated xDS configuration snapshot", slog.String("version", ver))
return x.snapshot.SetSnapshot(context.Background(), NodeID, snapshot)
return x.snapshot.SetSnapshot(ctx, NodeID, snapshot)
}

func (x *XdsServer) Serve(ctx context.Context, lis net.Listener) error {
// Ensure a first snapshot is deployed
if err := x.UpdateSnapshot(); err != nil {
if err := x.UpdateSnapshot(ctx); err != nil {
slog.ErrorContext(ctx, "Warning - initial xDS setup update failed", slog.String("err", err.Error()))
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/atenet/internal/app/router/xds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
)

func TestXdsServer_UpdateSnapshot(t *testing.T) {
server := NewXdsServer(18000)
server := NewXdsServer(context.Background(), 18000)
server.SetConfig(8081, 50052, "10.0.0.1")

err := server.UpdateSnapshot()
err := server.UpdateSnapshot(context.Background())
if err != nil {
t.Fatalf("UpdateSnapshot failed: %v", err)
}
Expand Down Expand Up @@ -139,11 +139,11 @@ func TestXdsServer_UpdateSnapshot(t *testing.T) {
}

func TestXdsServer_UpdateSnapshot_WithHttps(t *testing.T) {
server := NewXdsServer(18000)
server := NewXdsServer(context.Background(), 18000)
server.SetConfig(8085, 50053, "127.0.0.1")
server.SetTlsConfig(8443, "", "dummy-cert", "dummy-key")

err := server.UpdateSnapshot()
err := server.UpdateSnapshot(context.Background())
if err != nil {
t.Fatalf("UpdateSnapshot failed: %v", err)
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestXdsServer_UpdateSnapshot_WithHttps(t *testing.T) {
}

func TestXdsServer_Serve_Shutdown(t *testing.T) {
server := NewXdsServer(18000)
server := NewXdsServer(context.Background(), 18000)
server.SetConfig(8085, 50053, "127.0.0.1")

lis, err := net.Listen("tcp", "127.0.0.1:0")
Expand Down
5 changes: 2 additions & 3 deletions cmd/ateom-gvisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func do(ctx context.Context) error {
if err != nil {
serverboot.Fatal(ctx, "Failed to initialize tracing", err)
}
defer serverboot.ShutdownProvider("TracerProvider", tp.Shutdown)
defer serverboot.ShutdownProvider(ctx, "TracerProvider", tp.Shutdown)

// Create ateom dir
ateomDir := ateompath.AteomPath(*podNamespace, *podName)
Expand Down Expand Up @@ -146,8 +146,7 @@ func do(ctx context.Context) error {
reflection.Register(svr)

if err := svr.Serve(lis); err != nil {
slog.ErrorContext(ctx, "Failed to serve", slog.Any("err", err))
os.Exit(1)
return fmt.Errorf("failed to serve: %w", err)
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions demos/claude-code-multiplex/ui/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func newKubeClient() (*kubernetes.Clientset, error) {
// / etc).
func actorStatusString(s ateapipb.Actor_Status) string {
switch s {
case ateapipb.Actor_STATUS_UNSPECIFIED:
return "?"
case ateapipb.Actor_STATUS_RESUMING:
return "Resuming"
case ateapipb.Actor_STATUS_RUNNING:
Expand Down
2 changes: 2 additions & 0 deletions internal/controllers/actortemplate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (r *ActorTemplateReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
case atev1alpha1.PhaseReady:
return ctrl.Result{}, nil
case atev1alpha1.PhaseFailed:
return ctrl.Result{}, nil
default:
return ctrl.Result{}, fmt.Errorf("unrecognized phase %q", at.Status.Phase)
}
Expand Down
7 changes: 3 additions & 4 deletions internal/rendezvous/rendezvous.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ import (
"k8s.io/utils/ptr"
)

const labelKey = "rendezvous.ate.dev/application"

var (
const (
labelKey = "rendezvous.ate.dev/application"
leaseDuration = 15 * time.Second
leaseRenewalPeriod = 10 * time.Second
)
Expand Down Expand Up @@ -170,7 +169,7 @@ func (h *Hasher) ensureLease(ctx context.Context) error {
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &h.replicaName,
LeaseDurationSeconds: ptr.To(int32(int64(leaseDuration) / 1_000_000_000)),
LeaseDurationSeconds: ptr.To(int32(leaseDuration / time.Second)),
AcquireTime: ptr.To(metav1.NewMicroTime(now)),
RenewTime: ptr.To(metav1.NewMicroTime(now)),
LeaseTransitions: ptr.To[int32](1),
Expand Down
17 changes: 13 additions & 4 deletions internal/serverboot/serverboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"log/slog"
"net/http"
"os"
"time"

"github.com/agent-substrate/substrate/internal/contextlogging"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -131,9 +132,12 @@ func Fatal(ctx context.Context, msg string, err error) {
// ShutdownProvider invokes the OTel provider's Shutdown and logs any
// error. Designed to be deferred from main():
//
// defer serverboot.ShutdownProvider("TracerProvider", tp.Shutdown)
func ShutdownProvider(name string, shutdown func(context.Context) error) {
if err := shutdown(context.Background()); err != nil {
// defer serverboot.ShutdownProvider(ctx, "TracerProvider", tp.Shutdown)
func ShutdownProvider(ctx context.Context, name string, shutdown func(context.Context) error) {
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
defer cancel()

if err := shutdown(shutdownCtx); err != nil {
slog.Error("Failed to shutdown "+name, slog.Any("err", err))
}
}
Expand Down Expand Up @@ -161,7 +165,12 @@ func StartMetricsServer(ctx context.Context, opts MetricsServerOptions) {
})
}
slog.InfoContext(ctx, fmt.Sprintf("Starting Prometheus metrics server on %s", opts.Addr))
if err := http.ListenAndServe(opts.Addr, mux); err != nil {
srv := &http.Server{
Addr: opts.Addr,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}
if err := srv.ListenAndServe(); err != nil {
slog.Error("Failed to start prometheus metrics server", slog.Any("err", err))
}
}
2 changes: 1 addition & 1 deletion internal/servicednssigner/servicednssigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (h *Impl) MakeCert(ctx context.Context, pcr *certsv1beta1.PodCertificateReq
switch svc.Spec.Type {
case corev1.ServiceTypeClusterIP, corev1.ServiceTypeNodePort, corev1.ServiceTypeLoadBalancer:
// ok
default:
case corev1.ServiceTypeExternalName:
// This service type doesn't select pods using a label selector.
continue
}
Expand Down