Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ func NewRootCmd(cfg *env.Env, tel *telemetry.Client, logger log.Logger) *cobra.C

func Execute(ctx context.Context) error {
cfg := env.Init()
tel := telemetry.New(cfg.AnalyticsEndpoint, cfg.DisableEvents)
defer tel.Close()

logger, cleanup, err := newLogger()
if err != nil {
Expand All @@ -108,6 +106,10 @@ func Execute(ctx context.Context) error {
logger.Error("failed to shut down tracing: %v", err)
}
}()

tel := telemetry.New(cfg.AnalyticsEndpoint, cfg.DisableEvents)
defer tel.Close()

logger.Info("lstk %s starting", version.Version())

// Resolve auth token for telemetry: keyring first, then env var.
Expand Down
18 changes: 18 additions & 0 deletions internal/awscli/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@ import (
"os/exec"
"strings"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/localstack/lstk/internal/awsconfig"
"github.com/localstack/lstk/internal/output"
)

func Exec(ctx context.Context, endpointURL string, useProfile bool, stdout, stderr io.Writer, args []string) error {
ctx, span := otel.Tracer("github.com/localstack/lstk/internal/awscli").Start(ctx, "aws cli")
defer span.End()

awsBin, err := exec.LookPath("aws")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("aws CLI not found in PATH — install it from https://aws.amazon.com/cli/")
}

Expand All @@ -30,6 +39,11 @@ func Exec(ctx context.Context, endpointURL string, useProfile bool, stdout, stde
}
cmdArgs = append(cmdArgs, args...)

span.SetAttributes(
attribute.StringSlice("aws.args", args),
attribute.Bool("aws.use_profile", useProfile),
)

cmd := exec.CommandContext(ctx, awsBin, cmdArgs...)
cmd.Stdin = os.Stdin
cmd.Stdout = stdout
Expand All @@ -41,8 +55,12 @@ func Exec(ctx context.Context, endpointURL string, useProfile bool, stdout, stde
if err := cmd.Run(); err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
span.SetAttributes(attribute.Int("aws.exit_code", exitErr.ExitCode()))
span.SetStatus(codes.Error, "aws cli exited non-zero")
return output.NewSilentError(err)
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
return nil
Expand Down
16 changes: 12 additions & 4 deletions internal/telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"time"

"github.com/google/uuid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/localstack/lstk/internal/version"
)

Expand All @@ -28,9 +30,10 @@ type Client struct {
httpClient *http.Client
endpoint string

events chan eventBody
done chan struct{}
closeOnce sync.Once
events chan eventBody
done chan struct{}
closeOnce sync.Once
machineIDOnce sync.Once
}

// SetAuthToken stores the resolved auth token for inclusion in telemetry events.
Expand All @@ -46,11 +49,16 @@ func New(endpoint string, disabled bool) *Client {
c := &Client{
enabled: true,
sessionID: uuid.NewString(),
machineID: LoadOrCreateMachineID(),
// http.Client has no default timeout (zero means none). Without one, a
// slow or unreachable endpoint would block the worker goroutine.
httpClient: &http.Client{
Timeout: 3 * time.Second,
Transport: otelhttp.NewTransport(
http.DefaultTransport,
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
return "telemetry " + r.Method + " " + r.URL.Path
}),
),
},
endpoint: endpoint,
events: make(chan eventBody, 64),
Expand Down
16 changes: 8 additions & 8 deletions internal/telemetry/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,24 @@ func ToMap(v any) map[string]any {

// GetEnvironment returns the common environment payload for telemetry events,
// using the auth token set via SetAuthToken.
func (c *Client) GetEnvironment() Environment {
env := Environment{
func (c *Client) GetEnvironment(ctx context.Context) Environment {
c.machineIDOnce.Do(func() {
c.machineID = LoadOrCreateMachineID(ctx)
})
return Environment{
LstkVersion: version.Version(),
AuthTokenID: c.authToken,
OS: runtime.GOOS,
Arch: runtime.GOARCH,
MachineID: c.machineID,
}
if c.machineID != "" {
env.MachineID = c.machineID
}
return env
}

// EmitCommand emits an lstk_command telemetry event. The Environment block is
// populated automatically from the client state.
func (c *Client) EmitCommand(ctx context.Context, command string, flags []string, durationMS int64, exitCode int, errorMsg string) {
c.Emit(ctx, "lstk_command", ToMap(CommandEvent{
Environment: c.GetEnvironment(),
Environment: c.GetEnvironment(ctx),
Parameters: CommandParameters{Command: command, Flags: flags},
Result: CommandResult{
DurationMS: durationMS,
Expand All @@ -122,6 +122,6 @@ func (c *Client) EmitCommand(ctx context.Context, command string, flags []string
// Environment field is populated automatically from the client state; any
// value set by the caller is overwritten.
func (c *Client) EmitEmulatorLifecycleEvent(ctx context.Context, event LifecycleEvent) {
event.Environment = c.GetEnvironment()
event.Environment = c.GetEnvironment(ctx)
c.Emit(ctx, "lstk_lifecycle", ToMap(event))
}
4 changes: 2 additions & 2 deletions internal/telemetry/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func drainEvent(t *testing.T, tel *Client, ch <-chan map[string]any) map[string]
func TestGetEnvironment_PopulatesAllFields(t *testing.T) {
c := New("http://localhost", false)
c.SetAuthToken("ls-abc123")
env := c.GetEnvironment()
env := c.GetEnvironment(context.Background())

assert.Equal(t, version.Version(), env.LstkVersion)
assert.Equal(t, "ls-abc123", env.AuthTokenID)
Expand All @@ -59,7 +59,7 @@ func TestGetEnvironment_PopulatesAllFields(t *testing.T) {

func TestGetEnvironment_OmitsAuthTokenWhenEmpty(t *testing.T) {
c := New("http://localhost", false)
env := c.GetEnvironment()
env := c.GetEnvironment(context.Background())
assert.Empty(t, env.AuthTokenID)
}

Expand Down
21 changes: 16 additions & 5 deletions internal/telemetry/machine_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"crypto/md5"
"encoding/hex"
"net/http"
"os"
"path/filepath"
"strings"

dockerclient "github.com/docker/docker/client"
"github.com/google/uuid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/localstack/lstk/internal/config"
)

Expand All @@ -22,8 +25,8 @@ const (
// trying in order: Docker daemon ID, /etc/machine-id, then a persisted random UUID.
// Prefixes (dkr_, sys_, gen_) indicate origin, matching the Python implementation
// in localstack-core so IDs can be correlated across tools.
func LoadOrCreateMachineID() string {
if id := dockerDaemonID(); id != "" {
func LoadOrCreateMachineID(ctx context.Context) string {
if id := dockerDaemonID(ctx); id != "" {
return "dkr_" + anonymize(id)
}
if id := systemMachineID(); id != "" {
Expand All @@ -37,13 +40,21 @@ func anonymize(physicalID string) string {
return hex.EncodeToString(h[:])[:12]
}

func dockerDaemonID() string {
c, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
func dockerDaemonID(ctx context.Context) string {
c, err := dockerclient.NewClientWithOpts(
dockerclient.FromEnv,
dockerclient.WithAPIVersionNegotiation(),
dockerclient.WithTraceOptions(
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
return "docker " + r.Method + " " + r.URL.Path
}),
),
)
if err != nil {
return ""
}
defer func() { _ = c.Close() }()
info, err := c.Info(context.Background())
info, err := c.Info(ctx)
if err != nil {
return ""
}
Expand Down
2 changes: 2 additions & 0 deletions test/integration/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
DisableEvents Key = "LOCALSTACK_DISABLE_EVENTS"
Home Key = "HOME"
Persistence Key = "LOCALSTACK_PERSISTENCE"
Otel Key = "LSTK_OTEL"
OtelEndpoint Key = "OTEL_EXPORTER_OTLP_ENDPOINT"
)

func Get(key Key) string {
Expand Down
75 changes: 75 additions & 0 deletions test/integration/telemetry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package integration_test

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
Expand All @@ -9,6 +11,7 @@ import (
"os"
"os/exec"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -232,6 +235,78 @@ func assertCommandTelemetry(t *testing.T, events <-chan map[string]any, command
assert.InDelta(t, exitCode, result["exit_code"], 0)
}

// TestOtelTelemetrySpanIsExported verifies that the span created for the
// analytics POST request reaches the OTLP collector.
func TestOtelTelemetrySpanIsExported(t *testing.T) {
requireDocker(t)
cleanup()
t.Cleanup(cleanup)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

startTestContainer(t, ctx)

analyticsSrv, _ := mockAnalyticsServer(t)
otlpSrv, otlpBodies := mockOTLPCollector(t)

cmd := exec.CommandContext(ctx, binaryPath(), "start")
cmd.Env = env.Environ(testEnvWithHome(t.TempDir(), "")).
With(env.AuthToken, "fake-token").
With(env.AnalyticsEndpoint, analyticsSrv.URL).
With(env.Otel, "1").
With(env.OtelEndpoint, otlpSrv.URL)
out, err := cmd.CombinedOutput()
require.NoError(t, err, "lstk start failed: %s", out)

deadline := time.After(5 * time.Second)
for {
select {
case body := <-otlpBodies:
// otlptracehttp serializes spans as protobuf; UTF-8 strings appear
// inline in the wire format, so a substring search is sufficient.
if bytes.Contains(body, []byte("telemetry POST")) {
return
}
case <-deadline:
t.Fatal("timed out waiting for telemetry span in OTLP export — likely tel.Close() ran after tracing shutdown")
}
}
}

// mockOTLPCollector returns a test server that accepts OTLP/HTTP trace exports
// and forwards each (decompressed) request body to the returned channel.
func mockOTLPCollector(t *testing.T) (*httptest.Server, <-chan []byte) {
t.Helper()
bodies := make(chan []byte, 16)
var once sync.Once
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var reader io.Reader = r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
gz, err := gzip.NewReader(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer func() { _ = gz.Close() }()
reader = gz
}
body, err := io.ReadAll(reader)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
select {
case bodies <- body:
default:
once.Do(func() { t.Logf("OTLP body channel full, dropping payload of %d bytes", len(body)) })
}
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)
return srv, bodies
}

// collects events until count distinct event names have been received or the deadline expires.
func collectTelemetryByName(t *testing.T, events <-chan map[string]any, count int) map[string]map[string]any {
t.Helper()
Expand Down
Loading