Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,24 @@ From sessions:
1. **StreamNative Cloud**: `--organization` + `--key-file`
2. **External Kafka**: `--use-external-kafka` + Kafka params
3. **External Pulsar**: `--use-external-pulsar` + Pulsar params
4. **Multi-Session Pulsar** (SSE only): `--use-external-pulsar` + `--multi-session-pulsar`

Pre-configured context: `--pulsar-instance` + `--pulsar-cluster` disables context management tools.

### Multi-Session Pulsar Mode

When `--multi-session-pulsar` is enabled (SSE server with external Pulsar only):

- **No global PulsarSession**: Each request must provide its own token via `Authorization: Bearer <token>` header
- **HTTP 401 on auth failure**: Requests without valid tokens are rejected with HTTP 401 Unauthorized
- **Per-user session caching**: Sessions are cached using LRU with configurable size and TTL
- **Session management**: See `pkg/mcp/session/pulsar_session_manager.go`

Key files:
- `pkg/cmd/mcp/sse.go` - Auth middleware wraps SSEHandler()/MessageHandler()
- `pkg/mcp/session/pulsar_session_manager.go` - LRU session cache with TTL cleanup
- `pkg/cmd/mcp/server.go` - Skips global PulsarSession when multi-session enabled

## Error Handling

- Wrap errors: `fmt.Errorf("failed to X: %w", err)`
Expand Down
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,41 @@ snmcp sse --http-addr :9090 --http-path /mcp --use-external-pulsar --pulsar-web-
docker run -i --rm -e SNMCP_ORGANIZATION=my-org -e SNMCP_KEY_FILE=/key.json -v /path/to/key-file.json:/key.json -p 9090:9090 streamnative/snmcp sse
```

#### Multi-Session Pulsar Mode (SSE only)

When running the SSE server with external Pulsar, you can enable **multi-session mode** to support per-user authentication. In this mode, each HTTP request must include an `Authorization: Bearer <token>` header, and the server will create separate Pulsar sessions for each unique token.

```bash
# Start SSE server with multi-session Pulsar mode
snmcp sse --http-addr :9090 --http-path /mcp \
--use-external-pulsar \
--pulsar-web-service-url http://pulsar.example.com:8080 \
--multi-session-pulsar \
--session-cache-size 100 \
--session-ttl-minutes 30
```

**Key features:**
- **Per-user sessions**: Each user's Pulsar token creates a separate session
- **LRU caching**: Sessions are cached with LRU eviction when the cache is full
- **TTL-based cleanup**: Idle sessions are automatically cleaned up after the configured TTL
- **Strict authentication**: Requests without a valid `Authorization` header receive HTTP 401 Unauthorized

**Authentication flow:**
1. Client connects to SSE endpoint with `Authorization: Bearer <pulsar-jwt-token>` header
2. Server validates the token by attempting to create a Pulsar session
3. If valid, the session is cached and reused for subsequent requests
4. If invalid or missing, server returns HTTP 401 Unauthorized

**Configuration options:**
| Flag | Default | Description |
|------|---------|-------------|
| `--multi-session-pulsar` | `false` | Enable per-user Pulsar sessions |
| `--session-cache-size` | `100` | Maximum number of cached sessions |
| `--session-ttl-minutes` | `30` | Session idle timeout before eviction |

> **Note:** Multi-session mode is only available for external Pulsar mode (`--use-external-pulsar`) and only works with the SSE server, not stdio.

### Command-line Options

```
Expand Down Expand Up @@ -175,6 +210,9 @@ Flags:
--use-external-pulsar Use external Pulsar
--http-addr string HTTP server address (default ":9090")
--http-path string HTTP server path for SSE endpoint (default "/mcp")
--multi-session-pulsar Enable per-user Pulsar sessions based on Authorization header tokens (only for external Pulsar mode)
--session-cache-size int Maximum number of cached Pulsar sessions when multi-session is enabled (default 100)
--session-ttl-minutes int Session TTL in minutes before eviction when multi-session is enabled (default 30)
-v, --version version for snmcp
```

Expand Down
37 changes: 21 additions & 16 deletions pkg/cmd/mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,29 @@ func newMcpServer(_ context.Context, configOpts *ServerOptions, logrusLogger *lo
}
case snConfig.ExternalPulsar != nil:
{
psession, err := pulsar.NewSession(pulsar.PulsarContext{
ServiceURL: snConfig.ExternalPulsar.ServiceURL,
WebServiceURL: snConfig.ExternalPulsar.WebServiceURL,
AuthPlugin: snConfig.ExternalPulsar.AuthPlugin,
AuthParams: snConfig.ExternalPulsar.AuthParams,
Token: snConfig.ExternalPulsar.Token,
TLSAllowInsecureConnection: snConfig.ExternalPulsar.TLSAllowInsecureConnection,
TLSEnableHostnameVerification: snConfig.ExternalPulsar.TLSEnableHostnameVerification,
TLSTrustCertsFilePath: snConfig.ExternalPulsar.TLSTrustCertsFilePath,
TLSCertFile: snConfig.ExternalPulsar.TLSCertFile,
TLSKeyFile: snConfig.ExternalPulsar.TLSKeyFile,
})
if err != nil {
return nil, errors.Wrap(err, "failed to set external Pulsar context")
}
mcpServer = mcp.NewServer("streamnative-mcp-server", "0.0.1", logrusLogger, server.WithInstructions(mcp.GetExternalPulsarServerInstructions(snConfig.ExternalPulsar.WebServiceURL)))
mcpServer.PulsarSession = psession
s = mcpServer.MCPServer

// Only create global PulsarSession if not in multi-session mode
// In multi-session mode, each request must provide its own token via Authorization header
if !configOpts.MultiSessionPulsar {
psession, err := pulsar.NewSession(pulsar.PulsarContext{
ServiceURL: snConfig.ExternalPulsar.ServiceURL,
WebServiceURL: snConfig.ExternalPulsar.WebServiceURL,
AuthPlugin: snConfig.ExternalPulsar.AuthPlugin,
AuthParams: snConfig.ExternalPulsar.AuthParams,
Token: snConfig.ExternalPulsar.Token,
TLSAllowInsecureConnection: snConfig.ExternalPulsar.TLSAllowInsecureConnection,
TLSEnableHostnameVerification: snConfig.ExternalPulsar.TLSEnableHostnameVerification,
TLSTrustCertsFilePath: snConfig.ExternalPulsar.TLSTrustCertsFilePath,
TLSCertFile: snConfig.ExternalPulsar.TLSCertFile,
TLSKeyFile: snConfig.ExternalPulsar.TLSKeyFile,
})
if err != nil {
return nil, errors.Wrap(err, "failed to set external Pulsar context")
}
mcpServer.PulsarSession = psession
}
}
default:
{
Expand Down
81 changes: 70 additions & 11 deletions pkg/cmd/mcp/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func runSseServer(configOpts *ServerOptions) error {
TLSKeyFile: snConfig.ExternalPulsar.TLSKeyFile,
},
}
pulsarSessionManager = session.NewPulsarSessionManager(managerConfig, mcpServer.PulsarSession, logger)
// Pass nil as globalSession - in multi-session mode, every request must have a valid token
pulsarSessionManager = session.NewPulsarSessionManager(managerConfig, nil, logger)
logger.Info("Multi-session Pulsar mode enabled")
}

Expand All @@ -125,14 +126,16 @@ func runSseServer(configOpts *ServerOptions) error {
// Handle per-user Pulsar sessions
if pulsarSessionManager != nil {
token := session.ExtractBearerToken(r)
// Token is already validated in auth middleware, this should always succeed
if pulsarSession, err := pulsarSessionManager.GetOrCreateSession(ctx, token); err == nil {
c = context2.WithPulsarSession(c, pulsarSession)
if token != "" {
c = session.WithUserTokenHash(c, pulsarSessionManager.HashTokenForLog(token))
}
} else {
logger.WithError(err).Warn("Failed to get per-user Pulsar session, using global")
c = context2.WithPulsarSession(c, mcpServer.PulsarSession)
// Should not happen since middleware validates token first
logger.WithError(err).Error("Unexpected auth error after middleware validation")
// Don't set PulsarSession - tool handlers will fail gracefully with "session not found"
}
} else {
c = context2.WithPulsarSession(c, mcpServer.PulsarSession)
Expand All @@ -144,16 +147,62 @@ func runSseServer(configOpts *ServerOptions) error {

// 4. Expose the full SSE URL to the user
ssePath := sseServer.CompleteSsePath()
msgPath := sseServer.CompleteMessagePath()
fmt.Fprintf(os.Stderr, "StreamNative Cloud MCP Server listening on http://%s%s\n",
configOpts.HTTPAddr, ssePath)

// 5. Run the HTTP listener in a goroutine
errCh := make(chan error, 1)
go func() {
if err := sseServer.Start(configOpts.HTTPAddr); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err // bubble up real crashes
var httpServer *http.Server

if pulsarSessionManager != nil {
// Multi-session mode: use custom handlers with auth middleware
mux := http.NewServeMux()

// Auth middleware wrapper that validates token before processing
authMiddleware := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token := session.ExtractBearerToken(r)
if token == "" {
w.Header().Set("Content-Type", "application/json")
http.Error(w, `{"error":"missing Authorization header"}`, http.StatusUnauthorized)
return
}
// Pre-validate token by attempting to get/create session
if _, err := pulsarSessionManager.GetOrCreateSession(r.Context(), token); err != nil {
logger.WithError(err).Warn("Authentication failed")
w.Header().Set("Content-Type", "application/json")
http.Error(w, `{"error":"authentication failed"}`, http.StatusUnauthorized)
return
}
next.ServeHTTP(w, r)
})
}
}()

// Mount handlers with auth middleware
mux.Handle(ssePath, authMiddleware(sseServer.SSEHandler()))
mux.Handle(msgPath, authMiddleware(sseServer.MessageHandler()))

// Start custom HTTP server
httpServer = &http.Server{
Addr: configOpts.HTTPAddr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second, // Prevent Slowloris attacks
}
go func() {
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
}()
logger.Info("SSE server started with authentication middleware")
} else {
// Non-multi-session mode: use default Start()
go func() {
if err := sseServer.Start(configOpts.HTTPAddr); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err // bubble up real crashes
}
}()
}

// Give the server a moment to start
time.Sleep(100 * time.Millisecond)
Expand All @@ -177,10 +226,20 @@ func runSseServer(configOpts *ServerOptions) error {
pulsarSessionManager.Stop()
}

// First try to shut down the SSE server
if err := sseServer.Shutdown(shCtx); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
logger.Errorf("Error shutting down SSE server: %v", err)
// Shut down the HTTP server
if httpServer != nil {
// Multi-session mode: shut down custom HTTP server
if err := httpServer.Shutdown(shCtx); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
logger.Errorf("Error shutting down HTTP server: %v", err)
}
}
} else {
// Non-multi-session mode: shut down SSE server
if err := sseServer.Shutdown(shCtx); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
logger.Errorf("Error shutting down SSE server: %v", err)
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/mcp/session/pulsar_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func NewPulsarSessionManager(
func (m *PulsarSessionManager) GetOrCreateSession(_ context.Context, token string) (*pulsar.Session, error) {
if token == "" {
// Return global session when no token provided
// If no global session exists (multi-session mode), return error
if m.globalSession == nil {
return nil, fmt.Errorf("authentication required: missing Authorization header")
}
return m.globalSession, nil
}

Expand Down