Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/rs/zerolog v1.34.0
github.com/spf13/cobra v1.9.1
github.com/spf13/viper v1.20.1
github.com/stretchr/testify v1.10.0
)

require (
Expand Down Expand Up @@ -57,7 +58,6 @@ require (
github.com/spf13/afero v1.14.0 // indirect
github.com/spf13/cast v1.9.2 // indirect
github.com/spf13/pflag v1.0.7 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/wlynxg/anet v0.0.5 // indirect
golang.org/x/crypto v0.40.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions server/internal/plugins/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/m1k1o/neko/server/internal/plugins/chat"
"github.com/m1k1o/neko/server/internal/plugins/filetransfer"
"github.com/m1k1o/neko/server/internal/plugins/scaletozero"
"github.com/m1k1o/neko/server/internal/plugins/telemetry"
"github.com/m1k1o/neko/server/pkg/types"
)

Expand Down Expand Up @@ -49,6 +50,7 @@ func New(config *config.Plugins) *ManagerCtx {
manager.plugins.addPlugin(filetransfer.NewPlugin())
manager.plugins.addPlugin(chat.NewPlugin())
manager.plugins.addPlugin(scaletozero.NewPlugin())
manager.plugins.addPlugin(telemetry.NewPlugin())

return manager
}
Expand Down
30 changes: 30 additions & 0 deletions server/internal/plugins/telemetry/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package telemetry

import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

type Config struct {
Enabled bool
Endpoint string
}

func (Config) Init(cmd *cobra.Command) error {
cmd.PersistentFlags().Bool("telemetry.enabled", false, "forward live-view session connect/disconnect events to kernel-images-api")
if err := viper.BindPFlag("telemetry.enabled", cmd.PersistentFlags().Lookup("telemetry.enabled")); err != nil {
return err
}

cmd.PersistentFlags().String("telemetry.endpoint", "http://127.0.0.1:10001/telemetry/events", "kernel-images-api telemetry publish endpoint")
if err := viper.BindPFlag("telemetry.endpoint", cmd.PersistentFlags().Lookup("telemetry.endpoint")); err != nil {
return err
}

return nil
}

func (c *Config) Set() {
c.Enabled = viper.GetBool("telemetry.enabled")
c.Endpoint = viper.GetString("telemetry.endpoint")
}
186 changes: 186 additions & 0 deletions server/internal/plugins/telemetry/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package telemetry

import (
"bytes"
"context"
"encoding/json"
"net/http"
"sync"
"time"

"github.com/m1k1o/neko/server/pkg/types"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

const (
queueDepth = 256
defaultHTTPTimeout = 5 * time.Second
)

type Manager struct {
logger zerolog.Logger
config *Config
sessions types.SessionManager
httpClient *http.Client

mu sync.Mutex
connectedAt map[string]time.Time

eventsCh chan eventPayload
stopCh chan struct{}
wg sync.WaitGroup
}

func NewManager(sessions types.SessionManager, config *Config) *Manager {
return &Manager{
logger: log.With().Str("module", "telemetry").Logger(),
config: config,
sessions: sessions,
httpClient: &http.Client{Timeout: defaultHTTPTimeout},
connectedAt: make(map[string]time.Time),
eventsCh: make(chan eventPayload, queueDepth),
stopCh: make(chan struct{}),
}
}

func (m *Manager) Start() error {
if !m.config.Enabled {
return nil
}
m.logger.Info().Str("endpoint", m.config.Endpoint).Msg("plugin enabled")

m.wg.Add(1)
go m.worker()

m.sessions.OnConnected(func(session types.Session) {
m.handleConnect(session.ID())
})
m.sessions.OnDisconnected(func(session types.Session) {
m.handleDisconnect(session.ID())
})

return nil
}

func (m *Manager) Shutdown() error {
if !m.config.Enabled {
return nil
}
close(m.stopCh)
m.wg.Wait()
return nil
}

func (m *Manager) handleConnect(id string) {
m.mu.Lock()
m.connectedAt[id] = time.Now()
m.mu.Unlock()

m.enqueue(eventPayload{
Type: "live_view_connect",
SourceEvent: "neko.session.connected",
Data: map[string]any{"session_id": id},
})
}

func (m *Manager) handleDisconnect(id string) {
m.mu.Lock()
start, ok := m.connectedAt[id]
delete(m.connectedAt, id)
m.mu.Unlock()

var durationMs float64
if ok {
durationMs = float64(time.Since(start).Microseconds()) / 1000.0
}

m.enqueue(eventPayload{
Type: "live_view_disconnect",
SourceEvent: "neko.session.disconnected",
Data: map[string]any{"session_id": id, "duration_ms": durationMs},
})
}

func (m *Manager) enqueue(ev eventPayload) {
select {
case m.eventsCh <- ev:
default:
// Drop rather than block neko's session goroutines. A backed-up
// kernel-images-api means we'd lose lifecycle pairs anyway.
m.logger.Warn().Str("type", ev.Type).Msg("telemetry queue full; dropping event")
}
}

func (m *Manager) worker() {
defer m.wg.Done()
for {
select {
case <-m.stopCh:
// Best-effort drain on shutdown so we don't lose paired
// connect/disconnects when neko exits cleanly.
for {
select {
case ev := <-m.eventsCh:
m.publish(ev)
default:
return
}
}
case ev := <-m.eventsCh:
m.publish(ev)
}
}
}

func (m *Manager) publish(ev eventPayload) {
body := publishBody{
Type: ev.Type,
Category: "system",
Source: publishSource{
Kind: "local_process",
Event: ev.SourceEvent,
},
Data: ev.Data,
}
raw, err := json.Marshal(body)
if err != nil {
m.logger.Warn().Err(err).Msg("marshal telemetry body failed")
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultHTTPTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, m.config.Endpoint, bytes.NewReader(raw))
if err != nil {
m.logger.Warn().Err(err).Msg("build telemetry request failed")
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := m.httpClient.Do(req)
if err != nil {
m.logger.Debug().Err(err).Str("type", ev.Type).Msg("telemetry POST failed")
return
}
_ = resp.Body.Close()
if resp.StatusCode/100 != 2 {
m.logger.Debug().Int("status", resp.StatusCode).Str("type", ev.Type).Msg("telemetry POST non-2xx")
}
}

type eventPayload struct {
Type string
SourceEvent string
Data map[string]any
}

type publishSource struct {
Kind string `json:"kind"`
Event string `json:"event"`
}

type publishBody struct {
Type string `json:"type"`
Category string `json:"category"`
Source publishSource `json:"source"`
Data map[string]any `json:"data"`
}
Loading
Loading