Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
branches: [main]

jobs:
test:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/codestz/mcpx

go 1.24
go 1.26.1

require (
github.com/fatih/color v1.18.0
Expand Down
124 changes: 115 additions & 9 deletions internal/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,87 @@ func parseToolFlagsInternal(tool *mcp.Tool, rawArgs []string, skipRequired bool)
}

// connectServer resolves variables and connects to the MCP server.
// If the server has daemon: true, it ensures a daemon is running and connects
// via unix socket. Otherwise, it spawns a fresh subprocess.
// Routes to the appropriate transport based on config: http, sse, daemon, or stdio.
func connectServer(ctx context.Context, name string, sc *config.ServerConfig) (*mcp.Client, func(), error) {
timeout := 30 * time.Second
if sc.StartupTimeout != "" {
if d, parseErr := time.ParseDuration(sc.StartupTimeout); parseErr == nil {
timeout = d
}
}

switch sc.Transport {
case "http":
return connectHTTP(ctx, name, sc, timeout)
case "sse":
return connectSSE(ctx, name, sc, timeout)
default:
return connectStdio(ctx, name, sc, timeout)
}
}

// connectHTTP connects via the Streamable HTTP transport.
func connectHTTP(ctx context.Context, name string, sc *config.ServerConfig, timeout time.Duration) (*mcp.Client, func(), error) {
headers, err := resolveHeaders(sc)
if err != nil {
return nil, nil, fmt.Errorf("server %q: %w", name, err)
}

url, err := resolveURL(sc)
if err != nil {
return nil, nil, fmt.Errorf("server %q: %w", name, err)
}

transport := mcp.NewHTTPTransport(url, headers)
client := mcp.NewClient(transport)

initCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

if err := client.Initialize(initCtx); err != nil {
transport.Close()
return nil, nil, fmt.Errorf("server %q: initialize: %w", name, err)
}

cleanup := func() { client.Close() }
return client, cleanup, nil
}

// connectSSE connects via the legacy SSE transport.
func connectSSE(ctx context.Context, name string, sc *config.ServerConfig, timeout time.Duration) (*mcp.Client, func(), error) {
headers, err := resolveHeaders(sc)
if err != nil {
return nil, nil, fmt.Errorf("server %q: %w", name, err)
}

url, err := resolveURL(sc)
if err != nil {
return nil, nil, fmt.Errorf("server %q: %w", name, err)
}

// Use the parent context for the SSE stream — not a short-lived timeout context,
// since the SSE body reader stays open for the lifetime of the transport.
transport, err := mcp.NewSSETransport(ctx, url, headers)
if err != nil {
return nil, nil, fmt.Errorf("server %q: sse connect: %w", name, err)
}

client := mcp.NewClient(transport)

initCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

if err := client.Initialize(initCtx); err != nil {
transport.Close()
return nil, nil, fmt.Errorf("server %q: initialize: %w", name, err)
}

cleanup := func() { client.Close() }
return client, cleanup, nil
}

// connectStdio connects via stdio (direct or daemon mode).
func connectStdio(ctx context.Context, name string, sc *config.ServerConfig, timeout time.Duration) (*mcp.Client, func(), error) {
resolvedArgs, resolvedEnv, err := resolveServerConfig(sc)
if err != nil {
return nil, nil, fmt.Errorf("server %q: %w", name, err)
Expand All @@ -465,13 +543,6 @@ func connectServer(ctx context.Context, name string, sc *config.ServerConfig) (*
envSlice = append(envSlice, k+"="+v)
}

timeout := 30 * time.Second
if sc.StartupTimeout != "" {
if d, parseErr := time.ParseDuration(sc.StartupTimeout); parseErr == nil {
timeout = d
}
}

// Daemon mode: connect via unix socket to a long-running process.
if sc.Daemon {
socketPath, err := daemon.EnsureRunning(ctx, name, sc.Command, resolvedArgs, envSlice, timeout)
Expand Down Expand Up @@ -510,6 +581,41 @@ func connectServer(ctx context.Context, name string, sc *config.ServerConfig) (*
return client, cleanup, nil
}

// resolveHeaders builds the HTTP headers map from config, including auth.
func resolveHeaders(sc *config.ServerConfig) (map[string]string, error) {
projectRoot, _ := findProjectRoot()
secrets := secret.NewKeyringStore()
res := resolver.New(projectRoot, secrets)

headers := make(map[string]string)
for k, v := range sc.Headers {
resolved, err := res.Resolve(v)
if err != nil {
return nil, fmt.Errorf("resolve header %q: %w", k, err)
}
headers[k] = resolved
}

// Apply auth config.
if sc.Auth != nil && sc.Auth.Type == "bearer" && sc.Auth.Token != "" {
token, err := res.Resolve(sc.Auth.Token)
if err != nil {
return nil, fmt.Errorf("resolve auth token: %w", err)
}
headers["Authorization"] = "Bearer " + token
}

return headers, nil
}

// resolveURL resolves dynamic variables in the server URL.
func resolveURL(sc *config.ServerConfig) (string, error) {
projectRoot, _ := findProjectRoot()
secrets := secret.NewKeyringStore()
res := resolver.New(projectRoot, secrets)
return res.Resolve(sc.URL)
}

// resolveServerConfig resolves $(var) patterns in a server config.
func resolveServerConfig(sc *config.ServerConfig) ([]string, map[string]string, error) {
projectRoot, _ := findProjectRoot()
Expand Down
3 changes: 3 additions & 0 deletions internal/cli/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func convertMCPJSON(mcp *mcpJSON) *mcpxConfig {

// Determine transport.
switch entry.Type {
case "streamable-http", "http":
sc.Transport = "http"
sc.URL = entry.URL
case "sse":
sc.Transport = "sse"
sc.URL = entry.URL
Expand Down
24 changes: 23 additions & 1 deletion internal/cli/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,33 @@ func (o *output) printResult(result *mcp.CallResult) error {
return o.printJSON(result)
}

// Pretty mode: print text content, one block per line.
// Pretty mode: print content blocks.
for _, c := range result.Content {
switch c.Type {
case "text":
fmt.Fprintln(o.stdout, c.Text)
case "image":
mime := c.MimeType
if mime == "" {
mime = "unknown"
}
fmt.Fprintf(o.stdout, "[image: %s, %d bytes base64]\n", mime, len(c.Data))
case "audio":
mime := c.MimeType
if mime == "" {
mime = "unknown"
}
fmt.Fprintf(o.stdout, "[audio: %s, %d bytes base64]\n", mime, len(c.Data))
case "resource":
if c.Resource != nil {
if c.Resource.Text != "" {
fmt.Fprintf(o.stdout, "[resource: %s]\n%s\n", c.Resource.URI, c.Resource.Text)
} else {
fmt.Fprintf(o.stdout, "[resource: %s, %d bytes base64]\n", c.Resource.URI, len(c.Resource.Blob))
}
} else {
fmt.Fprintln(o.stdout, "[resource content]")
}
default:
fmt.Fprintf(o.stdout, "[%s content]\n", c.Type)
}
Expand Down
1 change: 1 addition & 0 deletions internal/cli/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

// pingCmd creates the "ping" command for health-checking an MCP server.
// Health is verified by connecting (which includes Initialize) and listing tools.
func pingCmd(opts *globalOpts) *cobra.Command {
return &cobra.Command{
Use: "ping <server>",
Expand Down
12 changes: 12 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type ServerConfig struct {
Daemon bool `yaml:"daemon"`
StartupTimeout string `yaml:"startup_timeout"`
URL string `yaml:"url"`
Headers map[string]string `yaml:"headers"`
Auth *AuthConfig `yaml:"auth"`
}

// AuthConfig holds authentication settings for remote transports.
type AuthConfig struct {
Type string `yaml:"type"`
Token string `yaml:"token"`
}

// Load reads the global (~/.mcpx/config.yml) and project (.mcpx/config.yml)
Expand Down Expand Up @@ -133,6 +141,10 @@ func Validate(cfg *Config) error {
if sc.Command == "" {
return fmt.Errorf("config: server %q: command is required for stdio transport", name)
}
case "http":
if sc.URL == "" {
return fmt.Errorf("config: server %q: url is required for http transport", name)
}
case "sse":
if sc.URL == "" {
return fmt.Errorf("config: server %q: url is required for sse transport", name)
Expand Down
13 changes: 13 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ func TestValidate(t *testing.T) {
"s": {Transport: "sse", URL: "http://localhost:8080"},
}},
},
{
name: "valid http",
cfg: &Config{Servers: map[string]*ServerConfig{
"s": {Transport: "http", URL: "https://mcp.example.com"},
}},
},
{
name: "missing url for http",
cfg: &Config{Servers: map[string]*ServerConfig{
"s": {Transport: "http"},
}},
wantErr: true,
},
{
name: "missing command for stdio",
cfg: &Config{Servers: map[string]*ServerConfig{
Expand Down
Loading
Loading