Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Package component provides building blocks for enterprise message-driven
// components. A [Descriptor] describes one component — its identity, flags,
// bootstrap function, and pub/sub topology — and a Procedure run through an
// [L] manages concurrent execution, logging, cleanup, and graceful shutdown.
//
// # Tracing contract
//
// The framework does not hold any span across an unbounded [Procedure]. Spans
// in component telemetry come from two places:
//
// - Callers, for their own bounded phases. Loader's Claim.Exec wraps each
// Component's Bootstrap call in a <component>.bootstrap span; per-handler
// instrumentation in subscription loops opens one span per invocation.
// - The framework itself, for runCleanup. l.exec wraps cleanup work in a
// bounded <name>.cleanup span so cleanup-time durations and panics remain
// observable without re-introducing a long-lived parent.
//
// Spans started from [L.Context] are roots of independent traces. l.Context()
// carries no active span — by design, since an unbounded Procedure would
// otherwise produce a single trace that grew with component uptime,
// saturating downstream backends.
//
// # Identity attributes
//
// Lifecycle identity is propagated as attributes through ctx values rather
// than via a parent-child span relationship. [WithAttributes] and
// [WithForkAttributes] stash a slice on the lifecycle's context.
// [NewSpanProcessor] returns an sdktrace.SpanProcessor that reads the slice
// from the parent context of every span and applies the attributes. Register
// it once during TracerProvider setup:
//
// tp := sdktrace.NewTracerProvider(
// sdktrace.WithBatcher(exporter),
// sdktrace.WithSpanProcessor(component.NewSpanProcessor()),
// )
// otel.SetTracerProvider(tp)
//
// Without registration the identity options have no effect on telemetry.
// Forked sub-lifecycles inherit the parent's attribute slice through the ctx
// chain; a fork that sets its own [WithForkAttributes] fully replaces the
// inherited slice (the framework does not merge — callers that want to
// extend must include the inherited values explicitly).
//
// # Effective anchor in trace data
//
// For Components loaded under a [Footprint], the loader package sets
// identity attributes component.name, footprint.identifier,
// footprint.revision, and footprint.solution on each forked Component's L.
// Together, (footprint.identifier, component.name) is the effective
// lifecycle anchor — the attribute pair operators query when filtering
// trace data for one component's spans.
//
// To disambiguate spans emitted by different processes that share the same
// component identity (for example, two replicas loading the exact same
// Footprint), the framework also stamps every span with a process-stable
// nonce as the process.nonce attribute. The nonce is returned by
// [ProcessNonce] and the loader logs it at startup so operators can pivot
// from log lines to the trace data emitted by the same process.
//
// # Handler-scope error recording
//
// [L.Error] and [L.Fatal] previously recorded errors onto the long-lived
// lifecycle span. With that span gone, they are log-only and deprecated.
// Use [L.ErrorContext] and [L.FatalContext] (and the f-variants) for
// errors that should also be recorded on the span active at the call site
// — typically a per-handler span the caller has just opened.
package component
6 changes: 2 additions & 4 deletions examples/direct/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
package main

import (
"fmt"

"github.com/google/uuid"

"github.com/danielorbach/go-component"
Expand Down Expand Up @@ -46,14 +44,14 @@ func main() {
l.Go("prober", func(l *component.L) {
sub, err := binding.LinkInterest(l.Context(), PongAspect)
if err != nil {
l.Fatal(err)
l.FatalContext(l.Context(), err)
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium-low] The example never registers NewSpanProcessor() that its own comments rely on. (re: EntrypointProc at main.go:42 and the comment at pong.go:56)

No non-test file calls otel.SetTracerProvider/NewSpanProcessor(), yet pong.go:56 asserts "component.NewSpanProcessor stamps the span with the lifecycle's identity attributes (component.name=pong, ...)". As shipped, the direct example produces spans with no identity attributes and no process.nonce, so a reader copying it — or following doc.go's "read the loader's process.nonce log line, then pivot to spans" workflow — gets nothing.

Issue #56 asked the example to demonstrate the right instrumentation shape; it shows span creation but not the identity-propagation half. Wiring up a TracerProvider with NewSpanProcessor() in main (even with a stdout exporter) would make the example match its comments.

}
l.CleanupBackground(sub.Shutdown)

for l.Continue() {
msg, err := sub.Receive(l.GraceContext())
if err != nil {
l.Error(fmt.Errorf("receive: %w", err))
l.ErrorfContext(l.Context(), "receive: %w", err)
continue
}
msg.Ack()
Expand Down
2 changes: 1 addition & 1 deletion examples/direct/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var PingComponent = &component.Descriptor{
text := fmt.Sprintf("%s (seq=%d)", options.Data, i)
err := pub.Send(l.Context(), &pubsub.Message{Body: []byte(text)})
if err != nil {
l.Error(fmt.Errorf("send ping: %w", err))
l.ErrorfContext(l.Context(), "send ping: %w", err)
}
case <-l.Stopping():
l.Log("graceful stop")
Expand Down
20 changes: 16 additions & 4 deletions examples/direct/pong.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"fmt"

"github.com/MakeNowJust/heredoc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"gocloud.dev/pubsub"

"github.com/danielorbach/go-component"
)

var pongTracer = otel.Tracer("github.com/danielorbach/go-component/examples/direct/pong")

const PongAspect = "pong"

var PongComponent = &component.Descriptor{
Expand Down Expand Up @@ -41,16 +45,24 @@ var PongComponent = &component.Descriptor{
case errors.Is(err, context.Canceled):
return
case err != nil:
l.Error(fmt.Errorf("receive ping: %w", err))
l.ErrorfContext(l.Context(), "receive ping: %w", err)
continue
}
msg.Ack()

// Per-message work is bounded by the handler invocation, so it
// belongs in its own bounded span. l.Context() is detached
// from any long-lived span; tracer.Start produces a root for
// a new trace, and component.NewSpanProcessor stamps the
// span with the lifecycle's identity attributes
// (component.name=pong, footprint.identifier=..., ...).
ctx, span := pongTracer.Start(l.Context(), "pong.echo",
trace.WithSpanKind(trace.SpanKindConsumer))
echo := "ECHO " + string(msg.Body)
err = pub.Send(l.Context(), &pubsub.Message{Body: []byte(echo)})
if err != nil {
l.Error(fmt.Errorf("send pong: %w", err))
if err := pub.Send(ctx, &pubsub.Message{Body: []byte(echo)}); err != nil {
l.ErrorfContext(ctx, "send pong: %w", err)
}
span.End()
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] Reference pong.echo span: End() not deferred (leak on panic) + no SetStatus on send error.

span.End() is called inline, so if pub.Send panics the span never ends/exports and the long-running loop accumulates un-ended spans. On a non-panic send error, ErrorfContext records an exception event but the span keeps status Unset, so a backend computing error rate from span status reports 0% errors while every echo is dropped.

The framework's own kafkaloader.handleFootprint calls span.SetStatus(codes.Error, ...) on its error path — this example (the shape issue #56 designated as the per-message reference) contradicts it. Use defer span.End() and set Error status in the failure branch.

}
})

Expand Down
2 changes: 1 addition & 1 deletion examples/embed/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var PingComponent = &component.Descriptor{
case <-t.C:
err := pub.Send(l.Context(), &pubsub.Message{Body: []byte(options.Data)})
if err != nil {
l.Error(fmt.Errorf("send ping: %w", err))
l.ErrorfContext(l.Context(), "send ping: %w", err)
}
case <-l.Stopping():
l.Log("graceful stop")
Expand Down
4 changes: 2 additions & 2 deletions examples/embed/pong.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ var PongComponent = &component.Descriptor{
for l.Continue() {
msg, err := sub.Receive(l.GraceContext())
if err != nil {
l.Error(fmt.Errorf("receive: %w", err))
l.ErrorfContext(l.Context(), "receive: %w", err)
continue
}
msg.Ack()

echo := "ECHO " + string(msg.Body)
err = pub.Send(l.Context(), &pubsub.Message{Body: []byte(echo)})
if err != nil {
l.Error(fmt.Errorf("send: %w", err))
l.ErrorfContext(l.Context(), "send: %w", err)
}
// we do not log this echo, run ProbeComponent to inspect the messages
}
Expand Down
4 changes: 2 additions & 2 deletions examples/embed/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ var ProbeComponent = &component.Descriptor{
l.Log("stopping:", context.Cause(l.Context()))
// the loop will stop because of l.Continue
case errors.Is(err, context.DeadlineExceeded):
l.Errorf("timeout while probing")
l.ErrorfContext(ctx, "timeout while probing")
case err != nil:
l.Errorf("receive: %w", err)
l.ErrorfContext(ctx, "receive: %w", err)
default:
msg.Ack()
l.Log(msg.Body)
Expand Down
4 changes: 2 additions & 2 deletions fileloader/fileloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func Load(footprint []byte, format FormatFlag) {
if err != nil {
loader.InvalidArgument("cannot load footprint: " + err.Error())
}
loader.Load(fp, component.WithSpan("loader"))
loader.Load(fp, component.WithName("loader"))
case FormatJSON:
fp, err := UnmarshalFootprintJSON(footprint)
if err != nil {
loader.InvalidArgument("cannot load footprint: " + err.Error())
}
loader.Load(fp, component.WithSpan("loader"))
loader.Load(fp, component.WithName("loader"))
case FormatAuto:
panic("component/fileloader: cannot auto-detect format from bytes")
default:
Expand Down
2 changes: 1 addition & 1 deletion fileloader/jsonloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type JSONLoader []byte
func (x JSONLoader) Exec(l *component.L) {
fp, err := UnmarshalFootprintJSON(x)
if err != nil {
l.Fatalf("json: %w", err)
l.FatalfContext(l.Context(), "json: %w", err)
}
loader.Load(fp)
}
Expand Down
4 changes: 3 additions & 1 deletion fileloader/yamlloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type YAMLLoader []byte
func (x YAMLLoader) Exec(l *component.L) {
fp, err := x.footprint()
if err != nil {
l.Fatal(fmt.Errorf("yaml: %w", err))
l.FatalfContext(l.Context(), "yaml: %w", err)
}
loader.Load(fp)
}
Expand All @@ -53,7 +53,9 @@ func (x YAMLLoader) footprint() (loader.Footprint, error) {
fp := loader.Footprint{
Name: msg.Name,
Metadata: msg.Metadata,
Solution: msg.Solution,
Identifier: msg.Identifier,
Revision: msg.Revision,
Locations: msg.Locations,
Allocations: make([]*loader.Claim, 0, len(msg.Components)),
}
Expand Down
14 changes: 7 additions & 7 deletions kafkaloader/kafkaloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Main(group string) {

// multiple instances of the same executable cooperatively respect the same
// footprint bus, so that only one of them will load a given footprint.
loader.Entrypoint(Loader{ReplicaSet: group}, component.WithSpan("loader"))
loader.Entrypoint(Loader{ReplicaSet: group}, component.WithName("loader"))
}

type Loader struct {
Expand All @@ -62,7 +62,7 @@ type Loader struct {
func (x Loader) Exec(l *component.L) {
control, err := OpenControlSubscription(x.ReplicaSet)
if err != nil {
l.Fatalf("open footprint-control subscription: %w", err)
l.FatalfContext(l.Context(), "open footprint-control subscription: %w", err)
}
l.CleanupBackground(control.Shutdown)

Expand All @@ -72,28 +72,28 @@ func (x Loader) Exec(l *component.L) {
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(l.GraceContext()), component.ErrStopped) {
break
}
l.Errorf("receive footprint: %w", err)
l.ErrorfContext(l.Context(), "receive footprint: %w", err)
continue
}
m.Ack()

var msg *sarama.ConsumerMessage
if !m.As(&msg) {
l.Fatalf("unexpected message type: %T", m)
l.FatalfContext(l.Context(), "unexpected message type: %T", m)
}
handleFootprint(l, msg)
}
}

func handleFootprint(l *component.L, msg *sarama.ConsumerMessage) {
// TODO: link to producer span
_, span := tracer.Start(l.Context(), "handleFootprint", trace.WithSpanKind(trace.SpanKindConsumer))
ctx, span := tracer.Start(l.Context(), "handleFootprint", trace.WithSpanKind(trace.SpanKindConsumer))
defer span.End()

fp, err := fileloader.UnmarshalFootprintJSON(msg.Value)
if err != nil {
l.Errorf("unmarshal footprint: %w", err)
span.SetStatus(codes.Error, err.Error()) // TODO: how does this look in Jaeger?
l.ErrorfContext(ctx, "unmarshal footprint: %w", err)
span.SetStatus(codes.Error, err.Error())
return
}

Expand Down
Loading