Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
211 changes: 120 additions & 91 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,109 +4,138 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co

## Build Commands

### Build the server binary
```bash
make build
# Or directly:
go build -o bin/snmcp cmd/streamnative-mcp-server/main.go
```

### Run tests
```bash
go test -race ./...
# Run a specific test:
go test -race ./pkg/mcp/builders/...
```

### Docker operations
```bash
make docker-build # Build local Docker image
make build # Build server binary to bin/snmcp
go test -race ./... # Run all tests with race detection
go test -race ./pkg/mcp/builders/... # Run specific package tests
go test -v -run TestName ./pkg/... # Run a single test
make license-check # Check license headers
make license-fix # Fix license headers
make docker-build # Build local Docker image
make docker-build-push # Build and push multi-platform image
```

### License checking
```bash
make license-check # Check license headers
make license-fix # Fix license headers
```

## Architecture Overview

The StreamNative MCP Server implements the Model Context Protocol to enable AI agents to interact with Apache Kafka, Apache Pulsar, and StreamNative Cloud resources.

### Core Components

1. **Session Management** (`pkg/config/`, `pkg/kafka/`, `pkg/pulsar/`)
- Three types of sessions: SNCloudSession, KafkaSession, PulsarSession
- Sessions manage client connections and authentication
- Sessions are created and configured based on command-line flags

2. **MCP Server** (`pkg/mcp/`)
- Central server implementation using `mark3labs/mcp-go` library
- Handles tool registration and request routing
- Features can be enabled/disabled via `--features` flag

3. **Tool Builders** (`pkg/mcp/builders/`)
- Registry pattern for registering MCP tools
- Separate builders for Kafka and Pulsar operations
- Each builder creates tools with specific operations (admin, client, etc.)
The StreamNative MCP Server implements the Model Context Protocol using the `mark3labs/mcp-go` library to enable AI agents to interact with Apache Kafka, Apache Pulsar, and StreamNative Cloud resources.

4. **PFTools** (`pkg/mcp/pftools/`)
- Abstraction layer for Pulsar Functions as MCP tools
- Dynamic tool generation from deployed Pulsar Functions
- Circuit breaker pattern for resilience
- Schema handling for input/output validation
### Request Flow

### Key Design Patterns

1. **Builder Pattern**: Tool builders (`pkg/mcp/builders/`) register tools dynamically based on enabled features
2. **Session Pattern**: Separate sessions for different services (Kafka, Pulsar, SNCloud) with lazy initialization
3. **Registry Pattern**: Central registry (`pkg/mcp/builders/registry.go`) manages all tool builders
4. **Circuit Breaker**: Used in PFTools for handling function invocation failures

## Development Guidelines

### Adding New Tools

1. Create a new builder in `pkg/mcp/builders/kafka/` or `pkg/mcp/builders/pulsar/`
2. Implement the `Builder` interface with `Build()` method
3. Register the builder in the appropriate tools file (e.g., `kafka_admin_*_tools.go`)
4. Add feature flag support in `pkg/mcp/features.go`

### Session Context

The server maintains session context that gets passed to tools via the context:
- Pulsar admin client retrieval: Use `session.GetAdminClient()` or `session.GetAdminV3Client()`
- Kafka client retrieval: Use `session.GetKafkaSession()` methods
- Always check for nil sessions before use

### Error Handling

- Use wrapped errors with context: `fmt.Errorf("failed to X: %w", err)`
- Check session availability before operations
- Return meaningful error messages for AI agent consumption
```
Client Request → MCP Server (pkg/mcp/server.go)
Tool Handler (from builders)
Session Context (pkg/mcp/ctx.go)
Service Client (Kafka/Pulsar/SNCloud)
```

## Testing
### Core Components

Tests follow standard Go testing patterns:
- Unit tests alongside source files (`*_test.go`)
- Use `testify` for assertions
- Mock external dependencies where appropriate
1. **Server & Sessions** (`pkg/mcp/server.go`)
- `Server` struct holds `MCPServer`, `KafkaSession`, `PulsarSession`, and `SNCloudSession`
- Sessions provide lazy-initialized clients for each service
- Context functions (`pkg/mcp/ctx.go`) inject/retrieve sessions from request context

## Important Files
2. **Tool Builders** (`pkg/mcp/builders/`)
- `ToolBuilder` interface: `GetName()`, `GetRequiredFeatures()`, `BuildTools()`, `Validate()`
- `BaseToolBuilder` provides common feature validation logic
- Each builder creates `[]server.ServerTool` with tool definitions and handlers
- Builders in `builders/kafka/` and `builders/pulsar/` implement service-specific tools

- `cmd/streamnative-mcp-server/main.go` - Entry point
- `pkg/cmd/mcp/server.go` - Server setup
- `pkg/mcp/server.go` - MCP server implementation
- `pkg/mcp/builders/registry.go` - Tool registration
- `pkg/mcp/pftools/manager.go` - Pulsar Functions management
- `pkg/config/config.go` - Configuration structures
3. **Tool Registration** (`pkg/mcp/*_tools.go`)
- Each `*_tools.go` file creates a builder, builds tools, and adds them to the server
- Tools are conditionally registered based on `--features` flag
- Feature constants defined in `pkg/mcp/features.go`

## Configuration
4. **PFTools - Functions as Tools** (`pkg/mcp/pftools/`)
- `PulsarFunctionManager` dynamically converts Pulsar Functions to MCP tools
- Polls for function changes and auto-registers/unregisters tools
- Circuit breaker pattern (`circuit_breaker.go`) for fault tolerance
- Schema conversion (`schema.go`) for input/output handling

The server supports three modes:
1. **StreamNative Cloud**: Requires `--organization` and `--key-file`
2. **External Kafka**: Use `--use-external-kafka` with Kafka connection parameters
3. **External Pulsar**: Use `--use-external-pulsar` with Pulsar connection parameters
### Key Design Patterns

When `--pulsar-instance` and `--pulsar-cluster` are provided together, context management tools are disabled as the context is pre-configured.
- **Builder Pattern**: Tool builders create tools based on features and read-only mode
- **Context Injection**: Sessions passed via `context.Context` using typed keys
- **Feature Flags**: Tools enabled/disabled via string feature identifiers
- **Circuit Breaker**: PFTools uses failure thresholds to prevent cascading failures

## Adding New Tools

1. **Create Builder** in `pkg/mcp/builders/kafka/` or `pkg/mcp/builders/pulsar/`:
```go
type MyToolBuilder struct {
*builders.BaseToolBuilder
}

func NewMyToolBuilder() *MyToolBuilder {
metadata := builders.ToolMetadata{
Name: "my_tool",
Description: "Tool description",
Category: "kafka_admin",
}
features := []string{"kafka-admin", "all", "all-kafka"}
return &MyToolBuilder{
BaseToolBuilder: builders.NewBaseToolBuilder(metadata, features),
}
}

func (b *MyToolBuilder) BuildTools(ctx context.Context, config builders.ToolBuildConfig) ([]server.ServerTool, error) {
if !b.HasAnyRequiredFeature(config.Features) {
return nil, nil
}
// Build and return tools
}
```

2. **Add Feature Constant** in `pkg/mcp/features.go` if needed

3. **Create Registration File** `pkg/mcp/my_tools.go`:
```go
func AddMyTools(s *server.MCPServer, readOnly bool, features []string) {
builder := kafkabuilders.NewMyToolBuilder()
config := builders.ToolBuildConfig{ReadOnly: readOnly, Features: features}
tools, _ := builder.BuildTools(context.Background(), config)
for _, tool := range tools {
s.AddTool(tool.Tool, tool.Handler)
}
}
```

4. **Get Session in Handler**:
```go
session := mcpCtx.GetKafkaSession(ctx) // or GetPulsarSession
if session == nil {
return mcp.NewToolResultError("session not found"), nil
}
admin, err := session.GetAdminClient()
```

## Session Context Access

Handlers receive sessions via context (see `pkg/mcp/internal/context/ctx.go`):
- `mcpCtx.GetKafkaSession(ctx)` → `*kafka.Session`
- `mcpCtx.GetPulsarSession(ctx)` → `*pulsar.Session`
- `mcpCtx.GetSNCloudSession(ctx)` → `*config.Session`

From sessions:
- `session.GetAdminClient()` / `session.GetAdminV3Client()` for Pulsar admin
- `session.GetPulsarClient()` for Pulsar messaging
- `session.GetAdminClient()` for Kafka admin (via franz-go/kadm)

## Configuration Modes

1. **StreamNative Cloud**: `--organization` + `--key-file`
2. **External Kafka**: `--use-external-kafka` + Kafka params
3. **External Pulsar**: `--use-external-pulsar` + Pulsar params

Pre-configured context: `--pulsar-instance` + `--pulsar-cluster` disables context management tools.

## Error Handling

- Wrap errors: `fmt.Errorf("failed to X: %w", err)`
- Return tool errors: `mcp.NewToolResultError("message")`
- Check session nil before operations
- For PFTools, use circuit breaker to handle repeated failures
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright 2025 StreamNative
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Multi-stage build for multi-platform support
FROM --platform=$BUILDPLATFORM golang:1.24-alpine AS builder

Expand Down
14 changes: 14 additions & 0 deletions Dockerfile.goreleaser
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright 2025 StreamNative
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Minimal Alpine image for GoReleaser builds
FROM alpine:3.21

Expand Down
25 changes: 11 additions & 14 deletions cmd/streamnative-mcp-server/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Copyright 2025 StreamNative
//
// http://www.apache.org/licenses/LICENSE-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

Expand Down
18 changes: 18 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF
github.com/cpuguy83/go-md2man/v2 v2.0.6 h1:XJtiaUW6dEEqVuZiMTn1ldk455QWwEIsMIJlo5vtkx0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible h1:dvc1KSkIYTVjZgHf/CTC2diTYC8PzhaA5sFISRfNVrE=
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/envoyproxy/go-control-plane v0.9.4 h1:rEvIZUSZ3fx39WIi3JkQqQBitGwpELBIYWeBVh6wn+E=
github.com/envoyproxy/go-control-plane v0.13.1 h1:vPfJZCkob6yTMEgS+0TwfTUfbHjfy/6vOJ8hUWX/uXE=
github.com/envoyproxy/go-control-plane v0.13.1/go.mod h1:X45hY0mufo6Fd0KW3rqsGvQMw58jvjymeCzBU3mWyHw=
Expand All @@ -82,6 +84,8 @@ github.com/go-redis/redis v6.15.6+incompatible h1:H9evprGPLI8+ci7fxQx6WNZHJSb7be
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-viper/mapstructure/v2 v2.3.0 h1:27XbWsHIqhbdR5TIC911OfYvgSaW93HM+dX7970Q7jk=
github.com/go-viper/mapstructure/v2 v2.3.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
github.com/golang-jwt/jwt/v4 v4.5.1 h1:JdqV9zKUdtaa9gdPlywC3aeoEsR681PlKC+4F5gQgeo=
github.com/golang-jwt/jwt/v4 v4.5.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
Expand Down Expand Up @@ -142,6 +146,8 @@ github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/mark3labs/mcp-go v0.23.1 h1:RzTzZ5kJ+HxwnutKA4rll8N/pKV6Wh5dhCmiJUu5S9I=
github.com/mark3labs/mcp-go v0.23.1/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mark3labs/mcp-go v0.34.0 h1:eWy7WBGvhk6EyAAyVzivTCprE52iXJwNtvHV6Cv3bR0=
github.com/mark3labs/mcp-go v0.34.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
Expand Down Expand Up @@ -194,11 +200,17 @@ go.opentelemetry.io/otel/sdk/metric v1.29.0 h1:K2CfmJohnRgvZ9UAj2/FhIf/okdWcNdBw
go.opentelemetry.io/otel/sdk/metric v1.29.0/go.mod h1:6zZLdCl2fkauYoZIOn/soQIDSWFmNSRcICarHfuhNJQ=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b h1:+qEpEAPhDZ1o0x3tHzZTQDArnOixOzGD9HUJfcg0mb4=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM=
golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
Expand All @@ -210,7 +222,13 @@ golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642 h1:B6caxRw+hozq68X2MY7jEpZh/cr4/aHLv9xU8Kkadrw=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
25 changes: 11 additions & 14 deletions pkg/auth/auth.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Copyright 2025 StreamNative
//
// http://www.apache.org/licenses/LICENSE-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

Expand Down
Loading