Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9e28406
Infrastructure improvements and bugfixes for vMCP
Jan 26, 2026
8aa930b
fix: Update CallTool and GetPrompt signatures to match BackendClient …
Jan 26, 2026
a6cb6d0
fix: Update ReadResource signature to match BackendClient interface
Jan 26, 2026
e97fcee
fix: Pass selfURL parameter to health.NewMonitor
Jan 26, 2026
3361c90
Fix NewHealthChecker calls in checker_test.go to include selfURL para…
Jan 26, 2026
e275ea1
Merge remote-tracking branch 'origin/main' into optimizer-enablers
Jan 26, 2026
e42e2ac
Fix NewMonitor calls in monitor_test.go to include selfURL parameter
Jan 26, 2026
90db15e
Fix Go import formatting issues (gci linter)
Jan 26, 2026
628f101
Fix Chart.yaml version - restore to 0.0.103
Jan 26, 2026
78f632b
Bump Chart.yaml version to 0.0.104
Jan 26, 2026
b2c8f0d
Update README.md version badge to 0.0.104
Jan 26, 2026
93b849a
Merge branch 'main' into optimizer-enablers
Jan 26, 2026
fac3f5d
Merge branch 'main' into optimizer-enablers
Jan 26, 2026
539e94d
Merge branch 'main' into optimizer-enablers
Jan 26, 2026
b7bfa2d
Merge branch 'main' into optimizer-enablers
Jan 26, 2026
58e3d0b
Refactor vMCP tracing and remove health checker self-check
Jan 27, 2026
cd7b756
Add explanatory comment for MCP SDK Meta limitations
Jan 27, 2026
af70d94
Update test helper comments to clarify pod readiness contract
Jan 27, 2026
bc96636
Complete error capture pattern in MergeCapabilities defer
Jan 27, 2026
5bfa1c1
Merge branch 'main' into optimizer-enablers
Jan 27, 2026
91badb6
Merge branch 'main' into optimizer-enablers
Jan 27, 2026
66b815a
Merge branch 'main' into optimizer-enablers
Jan 27, 2026
c514590
Remove singleflight race condition fix
Jan 27, 2026
ff267fe
Add SPDX license headers to manager.go
Jan 27, 2026
0b7873c
Merge branch 'main' into optimizer-enablers
Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ cmd/thv-operator/.task/checksum/crdref-gen
# Test coverage
coverage*

crd-helm-wrapper
crd-helm-wrapper
cmd/vmcp/__debug_bin*
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ linters:
- third_party$
- builtin$
- examples$
- scripts$
formatters:
enable:
- gci
Expand All @@ -155,3 +156,4 @@ formatters:
- third_party$
- builtin$
- examples$
- scripts$
41 changes: 24 additions & 17 deletions cmd/vmcp/app/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/otel/trace"
"k8s.io/client-go/rest"

"github.com/stacklok/toolhive/pkg/audit"
Expand Down Expand Up @@ -310,8 +311,27 @@ func runServe(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("failed to create conflict resolver: %w", err)
}

// Create aggregator
agg := aggregator.NewDefaultAggregator(backendClient, conflictResolver, cfg.Aggregation.Tools)
// If telemetry is configured, create the provider early so aggregator can use it
var telemetryProvider *telemetry.Provider
if cfg.Telemetry != nil {
telemetryProvider, err = telemetry.NewProvider(ctx, *cfg.Telemetry)
if err != nil {
return fmt.Errorf("failed to create telemetry provider: %w", err)
}
defer func() {
err := telemetryProvider.Shutdown(ctx)
if err != nil {
logger.Errorf("failed to shutdown telemetry provider: %v", err)
}
}()
}

// Create aggregator with tracer provider (nil if telemetry not configured)
var tracerProvider trace.TracerProvider
if telemetryProvider != nil {
tracerProvider = telemetryProvider.TracerProvider()
}
agg := aggregator.NewDefaultAggregator(backendClient, conflictResolver, cfg.Aggregation.Tools, tracerProvider)

// Use DynamicRegistry for version-based cache invalidation
// Works in both standalone (CLI with YAML config) and Kubernetes (operator-deployed) modes
Expand Down Expand Up @@ -381,21 +401,8 @@ func runServe(cmd *cobra.Command, _ []string) error {
host, _ := cmd.Flags().GetString("host")
port, _ := cmd.Flags().GetInt("port")

// If telemetry is configured, create the provider.
var telemetryProvider *telemetry.Provider
if cfg.Telemetry != nil {
var err error
telemetryProvider, err = telemetry.NewProvider(ctx, *cfg.Telemetry)
if err != nil {
return fmt.Errorf("failed to create telemetry provider: %w", err)
}
defer func() {
err := telemetryProvider.Shutdown(ctx)
if err != nil {
logger.Errorf("failed to shutdown telemetry provider: %v", err)
}
}()
}
// Note: telemetryProvider was already created earlier (before aggregator creation)
// to enable tracing in the aggregator

// Configure health monitoring if enabled
var healthMonitorConfig *health.MonitorConfig
Expand Down
2 changes: 2 additions & 0 deletions codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ coverage:
- "**/mocks/**/*"
- "**/mock_*.go"
- "**/zz_generated.deepcopy.go"
- "**/*_test.go"
- "**/*_test_coverage.go"
status:
project:
default:
Expand Down
4 changes: 2 additions & 2 deletions pkg/runner/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,8 +1079,8 @@ func TestRunConfigBuilder_WithRegistryProxyPort(t *testing.T) {
ProxyPort: testPort,
TargetPort: testPort,
},
cliProxyPort: 9000,
expectedProxyPort: 9000,
cliProxyPort: 9999,
expectedProxyPort: 9999,
},
{
name: "random port when neither CLI nor registry specified",
Expand Down
129 changes: 123 additions & 6 deletions pkg/vmcp/aggregator/default_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"fmt"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"

"github.com/stacklok/toolhive/pkg/logger"
Expand All @@ -21,15 +25,18 @@ type defaultAggregator struct {
backendClient vmcp.BackendClient
conflictResolver ConflictResolver
toolConfigMap map[string]*config.WorkloadToolConfig // Maps backend ID to tool config
tracer trace.Tracer
}

// NewDefaultAggregator creates a new default aggregator implementation.
// conflictResolver handles tool name conflicts across backends.
// workloadConfigs specifies per-backend tool filtering and overrides.
// tracerProvider is used to create a tracer for distributed tracing (pass nil for no tracing).
func NewDefaultAggregator(
backendClient vmcp.BackendClient,
conflictResolver ConflictResolver,
workloadConfigs []*config.WorkloadToolConfig,
tracerProvider trace.TracerProvider,
) Aggregator {
// Build tool config map for quick lookup by backend ID
toolConfigMap := make(map[string]*config.WorkloadToolConfig)
Expand All @@ -39,16 +46,38 @@ func NewDefaultAggregator(
}
}

// Create tracer from provider (use noop tracer if provider is nil)
var tracer trace.Tracer
if tracerProvider != nil {
tracer = tracerProvider.Tracer("github.com/stacklok/toolhive/pkg/vmcp/aggregator")
} else {
tracer = noop.NewTracerProvider().Tracer("github.com/stacklok/toolhive/pkg/vmcp/aggregator")
}

return &defaultAggregator{
backendClient: backendClient,
conflictResolver: conflictResolver,
toolConfigMap: toolConfigMap,
tracer: tracer,
}
}

// QueryCapabilities queries a single backend for its MCP capabilities.
// Returns the raw capabilities (tools, resources, prompts) from the backend.
func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.Backend) (*BackendCapabilities, error) {
func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.Backend) (_ *BackendCapabilities, retErr error) {
ctx, span := a.tracer.Start(ctx, "aggregator.QueryCapabilities",
trace.WithAttributes(
attribute.String("backend.id", backend.ID),
),
)
defer func() {
if retErr != nil {
span.RecordError(retErr)
span.SetStatus(codes.Error, retErr.Error())
}
span.End()
}()

logger.Debugf("Querying capabilities from backend %s", backend.ID)

// Create a BackendTarget from the Backend
Expand All @@ -74,6 +103,12 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
SupportsSampling: capabilities.SupportsSampling,
}

span.SetAttributes(
attribute.Int("tools.count", len(result.Tools)),
attribute.Int("resources.count", len(result.Resources)),
attribute.Int("prompts.count", len(result.Prompts)),
)

logger.Debugf("Backend %s: %d tools (after filtering/overrides), %d resources, %d prompts",
backend.ID, len(result.Tools), len(result.Resources), len(result.Prompts))

Expand All @@ -85,7 +120,20 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
func (a *defaultAggregator) QueryAllCapabilities(
ctx context.Context,
backends []vmcp.Backend,
) (map[string]*BackendCapabilities, error) {
) (_ map[string]*BackendCapabilities, retErr error) {
ctx, span := a.tracer.Start(ctx, "aggregator.QueryAllCapabilities",
trace.WithAttributes(
attribute.Int("backends.count", len(backends)),
),
)
defer func() {
if retErr != nil {
span.RecordError(retErr)
span.SetStatus(codes.Error, retErr.Error())
}
span.End()
}()

logger.Infof("Querying capabilities from %d backends", len(backends))

// Use errgroup for parallel queries with context cancellation
Expand Down Expand Up @@ -125,6 +173,10 @@ func (a *defaultAggregator) QueryAllCapabilities(
return nil, fmt.Errorf("no backends returned capabilities")
}

span.SetAttributes(
attribute.Int("successful.backends", len(capabilities)),
)

logger.Infof("Successfully queried %d/%d backends", len(capabilities), len(backends))
return capabilities, nil
}
Expand All @@ -134,7 +186,20 @@ func (a *defaultAggregator) QueryAllCapabilities(
func (a *defaultAggregator) ResolveConflicts(
ctx context.Context,
capabilities map[string]*BackendCapabilities,
) (*ResolvedCapabilities, error) {
) (_ *ResolvedCapabilities, retErr error) {
ctx, span := a.tracer.Start(ctx, "aggregator.ResolveConflicts",
trace.WithAttributes(
attribute.Int("backends.count", len(capabilities)),
),
)
defer func() {
if retErr != nil {
span.RecordError(retErr)
span.SetStatus(codes.Error, retErr.Error())
}
span.End()
}()

logger.Debugf("Resolving conflicts across %d backends", len(capabilities))

// Group tools by backend for conflict resolution
Expand Down Expand Up @@ -191,6 +256,12 @@ func (a *defaultAggregator) ResolveConflicts(
resolved.SupportsSampling = resolved.SupportsSampling || caps.SupportsSampling
}

span.SetAttributes(
attribute.Int("resolved.tools", len(resolved.Tools)),
attribute.Int("resolved.resources", len(resolved.Resources)),
attribute.Int("resolved.prompts", len(resolved.Prompts)),
)

logger.Debugf("Resolved %d unique tools, %d resources, %d prompts",
len(resolved.Tools), len(resolved.Resources), len(resolved.Prompts))

Expand All @@ -199,11 +270,26 @@ func (a *defaultAggregator) ResolveConflicts(

// MergeCapabilities creates the final unified capability view and routing table.
// Uses the backend registry to populate full BackendTarget information for routing.
func (*defaultAggregator) MergeCapabilities(
func (a *defaultAggregator) MergeCapabilities(
ctx context.Context,
resolved *ResolvedCapabilities,
registry vmcp.BackendRegistry,
) (*AggregatedCapabilities, error) {
) (_ *AggregatedCapabilities, retErr error) {
ctx, span := a.tracer.Start(ctx, "aggregator.MergeCapabilities",
trace.WithAttributes(
attribute.Int("resolved.tools", len(resolved.Tools)),
attribute.Int("resolved.resources", len(resolved.Resources)),
attribute.Int("resolved.prompts", len(resolved.Prompts)),
),
)
defer func() {
if retErr != nil {
span.RecordError(retErr)
span.SetStatus(codes.Error, retErr.Error())
}
span.End()
}()

logger.Debugf("Merging capabilities into final view")

// Create routing table
Expand Down Expand Up @@ -304,6 +390,13 @@ func (*defaultAggregator) MergeCapabilities(
},
}

span.SetAttributes(
attribute.Int("aggregated.tools", aggregated.Metadata.ToolCount),
attribute.Int("aggregated.resources", aggregated.Metadata.ResourceCount),
attribute.Int("aggregated.prompts", aggregated.Metadata.PromptCount),
attribute.String("conflict.strategy", string(aggregated.Metadata.ConflictStrategy)),
)

logger.Infof("Merged capabilities: %d tools, %d resources, %d prompts",
aggregated.Metadata.ToolCount, aggregated.Metadata.ResourceCount, aggregated.Metadata.PromptCount)

Expand All @@ -315,7 +408,23 @@ func (*defaultAggregator) MergeCapabilities(
// 2. Query all backends
// 3. Resolve conflicts
// 4. Merge into final view with full backend information
func (a *defaultAggregator) AggregateCapabilities(ctx context.Context, backends []vmcp.Backend) (*AggregatedCapabilities, error) {
func (a *defaultAggregator) AggregateCapabilities(
ctx context.Context,
backends []vmcp.Backend,
) (_ *AggregatedCapabilities, retErr error) {
ctx, span := a.tracer.Start(ctx, "aggregator.AggregateCapabilities",
trace.WithAttributes(
attribute.Int("backends.count", len(backends)),
),
)
defer func() {
if retErr != nil {
span.RecordError(retErr)
span.SetStatus(codes.Error, retErr.Error())
}
span.End()
}()

logger.Infof("Starting capability aggregation for %d backends", len(backends))

// Step 1: Create registry from discovered backends
Expand Down Expand Up @@ -343,6 +452,14 @@ func (a *defaultAggregator) AggregateCapabilities(ctx context.Context, backends
// Update metadata with backend count
aggregated.Metadata.BackendCount = len(backends)

span.SetAttributes(
attribute.Int("aggregated.backends", aggregated.Metadata.BackendCount),
attribute.Int("aggregated.tools", aggregated.Metadata.ToolCount),
attribute.Int("aggregated.resources", aggregated.Metadata.ResourceCount),
attribute.Int("aggregated.prompts", aggregated.Metadata.PromptCount),
attribute.String("conflict.strategy", string(aggregated.Metadata.ConflictStrategy)),
)

logger.Infof("Capability aggregation complete: %d backends, %d tools, %d resources, %d prompts",
aggregated.Metadata.BackendCount, aggregated.Metadata.ToolCount,
aggregated.Metadata.ResourceCount, aggregated.Metadata.PromptCount)
Expand Down
Loading
Loading