Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
defaultPort = 0
platform = "platform"
function = "function"
extension = "extension"
Expand Down
59 changes: 38 additions & 21 deletions collector/receiver/telemetryapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"context"
crand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -75,30 +76,52 @@ type telemetryAPIReceiver struct {
logReport bool
}

func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
address := listenOnAddress(r.port)
r.logger.Info("Listening for requests", zap.String("address", address))
func (r *telemetryAPIReceiver) Start(ctx context.Context, _ component.Host) error {
if len(r.types) == 0 {
return fmt.Errorf("no telemetry event types provided")
}
listener, address, err := r.bindListener(ctx)
if err != nil {
return fmt.Errorf("failed to bind listener: %w", err)
}
r.logger.Info("Starting telemetry API listener", zap.String("address", address))

mux := http.NewServeMux()
mux.HandleFunc("/", r.httpHandler)
r.httpServer = &http.Server{Addr: address, Handler: mux}
go func() {
_ = r.httpServer.ListenAndServe()
err := r.httpServer.Serve(listener)
if !errors.Is(err, http.ErrServerClosed) {
r.logger.Error("Telemetry API server stopped unexpectedly", zap.Error(err))
} else {
r.logger.Info("Telemetry API server stopped", zap.String("address", address))
}
}()

telemetryClient := telemetryapi.NewClient(r.logger)
if len(r.types) > 0 {
_, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address))
if err != nil {
r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID))
return err
}
if _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)); err != nil {
r.logger.Error("Failed to subscribe to telemetry", zap.Error(err))
_ = r.Shutdown(ctx)
return err
}
r.logger.Info("Successfully subscribed to telemetry", zap.String("address", address))
return nil
}

func (r *telemetryAPIReceiver) bindListener(ctx context.Context) (net.Listener, string, error) {
listenerAddr := listenOnAddress()
var lc net.ListenConfig
l, err := lc.Listen(ctx, "tcp", fmt.Sprintf("%s:%d", listenerAddr, r.port))
if err != nil {
return nil, "", err
}
addr := fmt.Sprintf("%s:%d", l.Addr().Network(), l.Addr().(*net.TCPAddr).Port)
return l, addr, nil
}

func (r *telemetryAPIReceiver) Shutdown(ctx context.Context) error {
return nil
err := r.httpServer.Shutdown(ctx)
return err
}

func newSpanID() pcommon.SpanID {
Expand Down Expand Up @@ -192,9 +215,6 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
}
}
}

r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len()))
slice = nil
}

func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string {
Expand Down Expand Up @@ -534,14 +554,11 @@ func newTelemetryAPIReceiver(
}, nil
}

func listenOnAddress(port int) string {
func listenOnAddress() string {
envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL")
var addr string
if ok && envAwsLocal == "true" {
addr = ":" + strconv.Itoa(port)
return ""
} else {
addr = "sandbox.localdomain:" + strconv.Itoa(port)
return "sandbox.localdomain"
}

return addr
}
8 changes: 4 additions & 4 deletions collector/receiver/telemetryapireceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ func TestListenOnAddress(t *testing.T) {
{
desc: "listen on address without AWS_SAM_LOCAL env variable",
testFunc: func(t *testing.T) {
addr := listenOnAddress(4325)
require.EqualValues(t, "sandbox.localdomain:4325", addr)
addr := listenOnAddress()
require.EqualValues(t, "sandbox.localdomain", addr)
},
},
{
desc: "listen on address with AWS_SAM_LOCAL env variable",
testFunc: func(t *testing.T) {
t.Setenv("AWS_SAM_LOCAL", "true")
addr := listenOnAddress(4325)
require.EqualValues(t, ":4325", addr)
addr := listenOnAddress()
require.EqualValues(t, "", addr)
},
},
}
Expand Down
Loading