Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions cmd/collect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package cmd

import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/promptconduit/cli/internal/collect"
"github.com/spf13/cobra"
)

var (
collectOTLPAddr string
collectDashboardAddr string
collectDir string
)

var collectCmd = &cobra.Command{
Use: "collect",
Short: "Run a local OTLP receiver, store, and dashboard for agent traces",
SilenceUsage: true,
SilenceErrors: true,
Long: `Start a local-first agent observability stack:

- An OTLP/HTTP receiver on 127.0.0.1:4318 (paths /v1/traces, /v1/logs).
- A newline-delimited JSON store under ~/.config/promptconduit/collect/.
- A read-only HTML+JSON dashboard on 127.0.0.1:4319.

This is the "weekend dogfood" stack: zero infra, point any OTLP-emitting
agent (Claude Code, Cursor on a recent build, anything using OpenInference
or OpenLLMetry) at the receiver and watch traces accumulate. The receiver
speaks OTLP/HTTP+JSON only — protobuf is on the roadmap.

Pointing Claude Code at the local receiver:

export CLAUDE_CODE_ENABLE_TELEMETRY=1
export OTEL_EXPORTER_OTLP_PROTOCOL=http/json
export OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318

Examples:
promptconduit collect # run with defaults
promptconduit collect --otlp :4318 # bind on all interfaces
promptconduit collect --dir /tmp/pc-traces # store somewhere else`,
RunE: runCollect,
}

func init() {
collectCmd.Flags().StringVar(&collectOTLPAddr, "otlp", "127.0.0.1:4318", "address for the OTLP/HTTP receiver")
collectCmd.Flags().StringVar(&collectDashboardAddr, "dashboard", "127.0.0.1:4319", "address for the read-only dashboard")
collectCmd.Flags().StringVar(&collectDir, "dir", "", "directory for NDJSON span files (defaults to ~/.config/promptconduit/collect)")
}

func runCollect(cmd *cobra.Command, args []string) error {
ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGTERM)
defer cancel()

srv, err := collect.New(collect.Options{
OTLPAddr: collectOTLPAddr,
DashboardAddr: collectDashboardAddr,
StoreDir: collectDir,
})
if err != nil {
return fmt.Errorf("init collector: %w", err)
}

fmt.Fprintf(cmd.ErrOrStderr(), "OTLP receiver: http://%s/v1/traces\n", collectOTLPAddr)
fmt.Fprintf(cmd.ErrOrStderr(), "Dashboard: http://%s\n", collectDashboardAddr)
fmt.Fprintf(cmd.ErrOrStderr(), "Store: %s\n", srv.StoreDir())
fmt.Fprintln(cmd.ErrOrStderr(), "Ctrl-C to stop.")

return srv.Run(ctx)
}
9 changes: 4 additions & 5 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func init() {
rootCmd.AddCommand(debugCmd)
rootCmd.AddCommand(upgradeCmd)
rootCmd.AddCommand(watchCmd)
rootCmd.AddCommand(collectCmd)
}

var versionCmd = &cobra.Command{
Expand Down Expand Up @@ -188,17 +189,15 @@ func spawnBackgroundUpgrade() {

// skipUpdateCheckFor returns true when the current command must not pay
// the cost of (or output) an update check. Hooks run many times per
// session and must stay fast; upgrade does its own checking.
// session and must stay fast; upgrade does its own checking; long-running
// loops shouldn't be interrupted with an update banner.
func skipUpdateCheckFor(cmd *cobra.Command) bool {
if os.Getenv("PROMPTCONDUIT_AUTO_UPDATE_CHILD") == "1" {
return true
}
name := commandPathRoot(cmd)
switch name {
case "hook", "upgrade", "watch":
// hook is per-event and must stay fast; upgrade and watch
// drive their own long-running loops and shouldn't have a
// random update banner interrupt them.
case "hook", "upgrade", "watch", "collect":
return true
}
return false
Expand Down
74 changes: 74 additions & 0 deletions internal/collect/dashboard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package collect

import (
_ "embed"
"encoding/json"
"net/http"
"strconv"
)

//go:embed ui/index.html
var dashboardHTML []byte

// mountDashboard registers the dashboard routes onto mux.
//
// GET / → embedded HTML
// GET /api/traces → JSON list of recent traces (?limit=N)
// GET /api/spans → JSON list of recent spans (?trace_id=..&limit=N)
func mountDashboard(mux *http.ServeMux, store *Store) {
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_, _ = w.Write(dashboardHTML)
})

mux.HandleFunc("/api/traces", func(w http.ResponseWriter, r *http.Request) {
limit := intParam(r, "limit", 50)
traces, err := store.ListTraces(limit)
writeJSON(w, traces, err)
})

mux.HandleFunc("/api/spans", func(w http.ResponseWriter, r *http.Request) {
limit := intParam(r, "limit", 500)
traceID := r.URL.Query().Get("trace_id")
spans, err := store.ReadSpans(limit, traceID)
writeJSON(w, spans, err)
})
}

func intParam(r *http.Request, name string, def int) int {
v := r.URL.Query().Get(name)
if v == "" {
return def
}
n, err := strconv.Atoi(v)
if err != nil || n <= 0 {
return def
}
return n
}

func writeJSON(w http.ResponseWriter, body any, err error) {
w.Header().Set("Content-Type", "application/json")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
switch v := body.(type) {
case []SpanRow:
if v == nil {
_, _ = w.Write([]byte("[]\n"))
return
}
case []TraceSummary:
if v == nil {
_, _ = w.Write([]byte("[]\n"))
return
}
}
_ = json.NewEncoder(w).Encode(body)
}
Loading
Loading