Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/cmd/api/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

"github.com/kernel/kernel-images/server/lib/cdpmonitor"
"github.com/kernel/kernel-images/server/lib/devtoolsproxy"
"github.com/kernel/kernel-images/server/lib/telemetry"
"github.com/kernel/kernel-images/server/lib/events"
"github.com/kernel/kernel-images/server/lib/logger"
"github.com/kernel/kernel-images/server/lib/nekoclient"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
"github.com/kernel/kernel-images/server/lib/policy"
"github.com/kernel/kernel-images/server/lib/recorder"
"github.com/kernel/kernel-images/server/lib/scaletozero"
"github.com/kernel/kernel-images/server/lib/telemetry"
)

type cdpMonitorController interface {
Expand Down Expand Up @@ -99,7 +99,7 @@ func New(
stz scaletozero.PinnedController,
nekoAuthClient *nekoclient.AuthClient,
telemetrySession *telemetry.TelemetrySession,
eventStream *events.EventStream,
eventStream *events.EventStream,
displayNum int,
) (*ApiService, error) {
switch {
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"log/slog"

"github.com/kernel/kernel-images/server/lib/devtoolsproxy"
"github.com/kernel/kernel-images/server/lib/telemetry"
"github.com/kernel/kernel-images/server/lib/events"
"github.com/kernel/kernel-images/server/lib/nekoclient"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
"github.com/kernel/kernel-images/server/lib/recorder"
"github.com/kernel/kernel-images/server/lib/scaletozero"
"github.com/kernel/kernel-images/server/lib/telemetry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down
26 changes: 13 additions & 13 deletions server/cmd/api/api/computer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestGaussianDelay_WelfordVelocityVariance(t *testing.T) {
for i := range distances {
t_norm := float64(i) / float64(steps)
base := 5.0 + 15.0*math.Sin(t_norm*math.Pi) // 5-20px, peaked in middle
distances[i] = base + rng.Float64()*3.0 // small random variation
distances[i] = base + rng.Float64()*3.0 // small random variation
}

// Gaussian delays → velocity variance
Expand Down Expand Up @@ -302,27 +302,27 @@ func TestClampPoints(t *testing.T) {
expected [][2]int
}{
{
name: "no clamping needed",
points: [][2]int{{10, 20}, {50, 50}, {100, 80}},
w: 200, h: 200,
name: "no clamping needed",
points: [][2]int{{10, 20}, {50, 50}, {100, 80}},
w: 200, h: 200,
expected: [][2]int{{10, 20}, {50, 50}, {100, 80}},
},
{
name: "clamp negative x and y",
points: [][2]int{{-10, -20}, {50, 50}},
w: 200, h: 200,
name: "clamp negative x and y",
points: [][2]int{{-10, -20}, {50, 50}},
w: 200, h: 200,
expected: [][2]int{{0, 0}, {50, 50}},
},
{
name: "clamp exceeding screen bounds",
points: [][2]int{{50, 50}, {250, 300}},
w: 200, h: 200,
name: "clamp exceeding screen bounds",
points: [][2]int{{50, 50}, {250, 300}},
w: 200, h: 200,
expected: [][2]int{{50, 50}, {199, 199}},
},
{
name: "clamp both directions",
points: [][2]int{{-5, 250}, {300, -10}, {100, 100}},
w: 200, h: 200,
name: "clamp both directions",
points: [][2]int{{-5, 250}, {300, -10}, {100, 100}},
w: 200, h: 200,
expected: [][2]int{{0, 199}, {199, 0}, {100, 100}},
},
}
Expand Down
3 changes: 1 addition & 2 deletions server/cmd/api/api/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"strings"
"time"

nekooapi "github.com/m1k1o/neko/server/lib/oapi"
"github.com/kernel/kernel-images/server/lib/cdpclient"
"github.com/kernel/kernel-images/server/lib/logger"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
"github.com/kernel/kernel-images/server/lib/recorder"
nekooapi "github.com/m1k1o/neko/server/lib/oapi"
)

// PatchDisplay updates the display configuration. When require_idle
Expand Down Expand Up @@ -655,4 +655,3 @@ func (s *ApiService) setResolutionViaNeko(ctx context.Context, width, height, re
log.Info("successfully changed resolution via Neko API", "width", width, "height", height, "refresh_rate", refreshRate)
return nil
}

2 changes: 1 addition & 1 deletion server/cmd/api/api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
"os/user"

"github.com/fsnotify/fsnotify"
"github.com/nrednav/cuid2"
"github.com/kernel/kernel-images/server/lib/logger"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
"github.com/kernel/kernel-images/server/lib/ziputil"
"github.com/kernel/kernel-images/server/lib/zstdutil"
"github.com/nrednav/cuid2"
)

// fsWatch represents an in-memory directory watch.
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/coder/websocket"
"github.com/creack/pty"
"github.com/google/uuid"
openapi_types "github.com/oapi-codegen/runtime/types"
"github.com/kernel/kernel-images/server/lib/logger"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
"github.com/kernel/kernel-images/server/lib/ptyio"
openapi_types "github.com/oapi-codegen/runtime/types"
)

type processHandle struct {
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"time"

"github.com/google/uuid"
openapi_types "github.com/oapi-codegen/runtime/types"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
"github.com/kernel/kernel-images/server/lib/scaletozero"
openapi_types "github.com/oapi-codegen/runtime/types"
"github.com/stretchr/testify/require"
)

Expand Down
1 change: 0 additions & 1 deletion server/cmd/chromium-launcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func main() {
}
}


// execLookPath helps satisfy syscall.Exec's requirement to pass an absolute path.
func execLookPath(file string) (string, error) {
if strings.ContainsRune(file, os.PathSeparator) {
Expand Down
1 change: 0 additions & 1 deletion server/cmd/chromium-launcher/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@ func TestExecLookPath(t *testing.T) {
t.Fatalf("execLookPath PATH search failed: p=%q err=%v", p, err)
}
}

2 changes: 1 addition & 1 deletion server/cmd/shell/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"time"

"github.com/google/uuid"
openapi_types "github.com/oapi-codegen/runtime/types"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
openapi_types "github.com/oapi-codegen/runtime/types"
"golang.org/x/term"
)

Expand Down
1 change: 0 additions & 1 deletion server/cmd/wrapper/chromium.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,3 @@ func applyHeadlessDefaultFlags() {
}, " ")
_ = os.Setenv("CHROMIUM_FLAGS", flags)
}

4 changes: 2 additions & 2 deletions server/cmd/wrapper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ func main() {

// waitAllReady gates on all caller-visible ready signals concurrently:
// - cdp : HTTP /json/version on the public CDP port (proves api proxy is
// wired through to chromium's DevTools server)
// wired through to chromium's DevTools server)
// - chromedriver : TCP on chromedriver's internal port 9225 (api on 9224 is bound
// when api itself is up, which CDP readiness already implies)
// when api itself is up, which CDP readiness already implies)
// - neko : TCP on neko's HTTP port (8080), only when ENABLE_WEBRTC=true
// - envoy : TCP on envoy's listener (3128), only when envoy is enabled
func waitAllReady(t0 time.Time, webrtc bool) map[string]time.Duration {
Expand Down
2 changes: 1 addition & 1 deletion server/e2e/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *TestContainer) ExitCh() <-chan error {
// WaitDevTools waits for the CDP WebSocket endpoint to be ready.
func (c *TestContainer) WaitDevTools(ctx context.Context) error {
return wait.ForListeningPort(nat.Port("9222/tcp")).
WithStartupTimeout(2 * time.Minute).
WithStartupTimeout(2*time.Minute).
WaitUntilReady(ctx, c.ctr)
}

Expand Down
22 changes: 11 additions & 11 deletions server/lib/cdpclient/cdpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ import (
// SetDeviceMetricsOverride: Target.getTargets, Target.attachToTarget,
// Emulation.setDeviceMetricsOverride, and Target.detachFromTarget.
type fakeCDP struct {
getTargetsCalled bool
attachCalled bool
setMetricsCalled bool
setMetricsWidth int
setMetricsHeight int
detachCalled bool
pageTargetID string
sessionID string
failGetTargets bool
failSetMetrics bool
returnNoPageTargets bool
getTargetsCalled bool
attachCalled bool
setMetricsCalled bool
setMetricsWidth int
setMetricsHeight int
detachCalled bool
pageTargetID string
sessionID string
failGetTargets bool
failSetMetrics bool
returnNoPageTargets bool
}

func (f *fakeCDP) handler(w http.ResponseWriter, r *http.Request) {
Expand Down
1 change: 0 additions & 1 deletion server/lib/cdpmonitor/computed.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (s *computedState) navDataWith(extra map[string]any) json.RawMessage {
return out
}


// currentNavCtxFields returns the current nav context fields for constructing typed event payloads.
// Returns zero values if s is nil (before first navigation).
func (s *computedState) currentNavCtxFields() (sessionID, targetID, targetType, frameID, loaderID, url string, navSeq int64) {
Expand Down
28 changes: 14 additions & 14 deletions server/lib/cdpmonitor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ const mainSessionUnset = "\x00unset"
// Each maps 1-to-1 with a specific CDP domain event (Runtime.*, Network.*,
// Page.*, PerformanceTimeline.*) received from Chrome.
const (
EventConsoleLog = "console_log" // Runtime.consoleAPICalled (non-error types)
EventConsoleError = "console_error" // Runtime.consoleAPICalled (type=error) or Runtime.exceptionThrown
EventNetworkRequest = "network_request" // Network.requestWillBeSent
EventNetworkResponse = "network_response" // Network.loadingFinished (with prior responseReceived)
EventNetworkLoadingFailed = "network_loading_failed" // Network.loadingFailed
EventNavigation = "page_navigation" // Page.frameNavigated
EventConsoleLog = "console_log" // Runtime.consoleAPICalled (non-error types)
EventConsoleError = "console_error" // Runtime.consoleAPICalled (type=error) or Runtime.exceptionThrown
EventNetworkRequest = "network_request" // Network.requestWillBeSent
EventNetworkResponse = "network_response" // Network.loadingFinished (with prior responseReceived)
EventNetworkLoadingFailed = "network_loading_failed" // Network.loadingFailed
EventNavigation = "page_navigation" // Page.frameNavigated
EventDOMContentLoaded = "page_dom_content_loaded" // Page.domContentEventFired
EventPageLoad = "page_load" // Page.loadEventFired
EventLayoutShift = "page_layout_shift" // PerformanceTimeline event of type "layout-shift"
Expand All @@ -33,27 +33,27 @@ const (
// None of these correspond to a single CDP notification; they are inferred from
// sequences of CDP events and debounce timers.
const (
EventNetworkIdle = "network_idle" // 500 ms after all in-flight requests finish
EventLayoutSettled = "page_layout_settled" // 1 s after page_load with no intervening layout shifts
EventNetworkIdle = "network_idle" // 500 ms after all in-flight requests finish
EventLayoutSettled = "page_layout_settled" // 1 s after page_load with no intervening layout shifts
EventNavigationSettled = "page_navigation_settled" // fires once page_dom_content_loaded and page_layout_settled both hold
)

// Interaction events — fired by injected page-side JS (interaction.js) via the
// Runtime.bindingCalled mechanism. They originate in the browser's renderer
// process, not from Chrome's network or page domains.
const (
EventInteractionClick = "interaction_click" // document click (target selector, coords, text)
EventInteractionKey = "interaction_key" // keydown (key name, target selector)
EventInteractionClick = "interaction_click" // document click (target selector, coords, text)
EventInteractionKey = "interaction_key" // keydown (key name, target selector)
EventScrollSettled = "interaction_scroll_settled" // 300 ms after the last scroll event on a target
)

// Monitor lifecycle and internal events — emitted by the monitor itself, not by Chrome.
const (
EventScreenshot = "monitor_screenshot" // ffmpeg frame capture on page load or JS exception
EventMonitorDisconnected = "monitor_disconnected" // WebSocket to Chrome closed unexpectedly
EventMonitorReconnected = "monitor_reconnected" // successfully reconnected after a disconnect
EventScreenshot = "monitor_screenshot" // ffmpeg frame capture on page load or JS exception
EventMonitorDisconnected = "monitor_disconnected" // WebSocket to Chrome closed unexpectedly
EventMonitorReconnected = "monitor_reconnected" // successfully reconnected after a disconnect
EventMonitorReconnectFailed = "monitor_reconnect_failed" // reconnect attempts exhausted
EventMonitorInitFailed = "monitor_init_failed" // could not initialise the CDP session
EventMonitorInitFailed = "monitor_init_failed" // could not initialise the CDP session
)

// Metadata keys written into events.Source.Metadata for CDP-sourced events.
Expand Down
89 changes: 52 additions & 37 deletions server/lib/devtoolsproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,47 +371,12 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMess

logger.Debug("proxying websocket", slog.String("url", upstreamURL))

// Cancel the pump when the upstream URL changes (Chromium restarted),
// forcing the client to reconnect with the new upstream.
pumpCtx, pumpCancel := context.WithCancel(r.Context())

// Set by the URL-watcher when it tears down the pump; cleanup falls
// back to client_close otherwise.
var reasonOverride atomic.Pointer[oapi.BrowserCdpDisconnectEventDataReason]

go func(currentUpstreamURL string) {
for {
select {
case newURL, ok := <-urlCh:
if !ok {
return
}
newURL = normalizeUpstreamURL(newURL)
if newURL == "" || newURL == currentUpstreamURL {
continue
}
logger.Info("upstream URL changed, closing stale proxy session",
slog.String("old_url", currentUpstreamURL),
slog.String("new_url", newURL))
reason := oapi.UpstreamChanged
reasonOverride.CompareAndSwap(nil, &reason)
pumpCancel()
return
case <-pumpCtx.Done():
return
}
}
}(upstreamURL)

var once sync.Once
cleanup := func() {
cleanup := func(cause wsproxy.PumpExitCause) {
once.Do(func() {
reason := oapi.ClientClose
if rp := reasonOverride.Load(); rp != nil {
reason = *rp
} else if r.Context().Err() != nil {
reason = oapi.ContextCancelled
}
reason := resolveDisconnectReason(cause, r.Context(), mgr, urlCh, upstreamURL, restartConfirmWait, logger)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be a behavioral change, not just telemetry. Previously the URL watcher canceled the pump as soon as UpstreamManager published a different DevTools URL, forcing clients off stale upstream sessions. With this change, urlCh is only checked during dial/reason resolution after the pump exits, so a session can stay attached to the old upstream until the websocket itself errors or the request context is canceled. Could we preserve the watcher for stale-session cancellation while keeping this new cause-based reason resolution?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I am wrong, but just double-checking here

pumpCancel()
upstreamConn.Close(websocket.StatusNormalClosure, "")
clientConn.Close(websocket.StatusNormalClosure, "")
Expand All @@ -423,6 +388,56 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMess
})
}

// restartConfirmWait is how long cleanup waits for a new upstream URL after
// the upstream side of the pump dies before classifying the disconnect as
// upstream_error vs upstream_changed. Sized for Chromium's typical cold
// restart (~5-8s on Unikraft Cloud) with headroom. var (not const) so tests
// can temporarily shrink it.
var restartConfirmWait = 10 * time.Second

// resolveDisconnectReason picks the cdp_disconnect reason from which side
// caused the pump to exit. When the upstream side errored, it waits briefly
// on urlCh for a new Chromium URL: a new URL means Chromium restarted
// (upstream_changed); no URL within the window means the upstream broke
// without a restart (upstream_error).
func resolveDisconnectReason(cause wsproxy.PumpExitCause, reqCtx context.Context, mgr *UpstreamManager, urlCh <-chan string, dialedURL string, restartWait time.Duration, logger *slog.Logger) oapi.BrowserCdpDisconnectEventDataReason {
if reqCtx.Err() != nil || cause == wsproxy.PumpExitContext {
return oapi.ContextCancelled
}
if cause == wsproxy.PumpExitClient {
return oapi.ClientClose
}

if newest := normalizeUpstreamURL(mgr.Current()); newest != "" && newest != dialedURL {
logger.Info("upstream changed before disconnect resolution",
slog.String("old_url", dialedURL), slog.String("new_url", newest))
return oapi.UpstreamChanged
}
// Stale or duplicate URL broadcasts don't reset the timer; total wait
// is bounded by restartWait regardless of how many we see.
timer := time.NewTimer(restartWait)
defer timer.Stop()
for {
select {
case newURL, ok := <-urlCh:
if !ok {
return oapi.UpstreamError
}
newURL = normalizeUpstreamURL(newURL)
if newURL == "" || newURL == dialedURL {
continue
}
logger.Info("upstream restart confirmed after disconnect",
slog.String("old_url", dialedURL), slog.String("new_url", newURL))
return oapi.UpstreamChanged
case <-timer.C:
return oapi.UpstreamError
case <-reqCtx.Done():
return oapi.ContextCancelled
}
}
}

func publishCdpConnect(publish EventPublisher) {
if publish == nil {
return
Expand Down
Loading
Loading