Skip to content
This repository was archived by the owner on May 1, 2026. It is now read-only.

feat: enhance log streaming with timeout and direct SSE support#476

Closed
WcaleNieWolny wants to merge 4 commits intomainfrom
support_direct_sse_logs
Closed

feat: enhance log streaming with timeout and direct SSE support#476
WcaleNieWolny wants to merge 4 commits intomainfrom
support_direct_sse_logs

Conversation

@WcaleNieWolny
Copy link
Copy Markdown
Contributor

@WcaleNieWolny WcaleNieWolny commented Jan 27, 2026

Changes

  • Introduced a readWithTimeout function to handle read operations with a specified timeout.
  • Updated streamBuildLogs to support direct SSE streaming using a provided URL and token.
  • Improved error handling and reconnection logic for log streaming.
  • Adjusted parameters for streamBuildLogs to accept new arguments for direct log streaming.
  • Enhanced logging for better visibility during connection attempts and failures.

Motivation

Martin told me to improve the Capgo Native build. And I believe that by removing the complex proxy system around server-side events we decrease the cost of running Capgo Native builder as well as provide a more reliable experience for the end user.

Business impact

Low/medium

Removing this complex proxy system decreases the cost as well as the complexity of Capgo Builder, which positively improves the quality of code as well as decreases the costs. Furthermore, it improves reliability of the system for the user.

Summary by CodeRabbit

  • New Features

    • Implemented automatic reconnection logic for build log streaming with improved retry behavior.
    • Enhanced detection and display of build final status messages.
  • Bug Fixes

    • Improved timeout handling during log streaming to prevent connection hangs.
    • Better error recovery for network failures and authorization issues.

✏️ Tip: You can customize this high-level summary in your review settings.

- Introduced a `readWithTimeout` function to handle read operations with a specified timeout.
- Updated `streamBuildLogs` to support direct SSE streaming using a provided URL and token.
- Improved error handling and reconnection logic for log streaming.
- Adjusted parameters for `streamBuildLogs` to accept new arguments for direct log streaming.
- Enhanced logging for better visibility during connection attempts and failures.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jan 27, 2026

📝 Walkthrough

Walkthrough

The streamBuildLogs function in src/build/request.ts was refactored to use direct log URLs and tokens instead of deriving them from host/job parameters. A new readWithTimeout helper function was added to enforce read timeouts, and the streaming logic was enhanced with reconnection attempts, improved SSE parsing with line buffering, and final status detection.

Changes

Cohort / File(s) Change Summary
Direct Log Streaming Refactor
src/build/request.ts
Signature of streamBuildLogs changed from host/jobId-based parameters to direct URL/token approach (directLogsUrl, directLogsToken). New readWithTimeout helper enforces read timeouts. Streaming logic now includes timeout handling, reconnection loop with configurable delays and max attempts, SSE line parsing with buffering, final status detection, and enhanced error handling. Start response type updated to include logs_url and logs_token fields.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 Hop, hop, hooray! The streams flow true,
With timeouts set and reconnects too,
Direct paths chosen, no more detours required,
SSE lines parsed, resilience inspired,
Logs arrive swift, connection's secured! 📡

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main changes: adding timeout support to log streaming and implementing direct SSE (Server-Sent Events) support, which are the primary enhancements reflected in the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@WcaleNieWolny
Copy link
Copy Markdown
Contributor Author

@copilot review please :)

The AbortController was created but never used to abort the fetch.
Simplified the code by removing the dead code.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request enhances the build log streaming functionality by removing the complex proxy system and implementing direct Server-Sent Events (SSE) streaming with improved resilience. The changes introduce timeout handling, automatic reconnection with sequence tracking for resumption, and better error handling for various failure scenarios.

Changes:

  • Introduced readWithTimeout function to detect stale connections and trigger reconnection
  • Refactored streamBuildLogs to accept direct SSE endpoint URL and JWT token instead of constructing the endpoint from host/job/app parameters
  • Implemented reconnection logic with sequence tracking to resume from last received message
  • Enhanced error handling to differentiate between temporary failures (retry) and terminal errors (don't retry)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/build/request.ts
Comment on lines +287 to +396
while (reconnectAttempts < maxReconnectAttempts) {
if (finalStatus) {
break
}
// Direct SSE: Connect directly to CF Worker with JWT token
let logUrl = `${directLogsUrl}?token=${encodeURIComponent(directLogsToken)}`
if (lastSequence >= 0) {
logUrl += `&last_sequence=${lastSequence}`
}

const reader = response.body?.getReader()
if (!reader) {
log.warn('No response body for log stream')
return null
if (reconnectAttempts === 0) {
log.info('Connecting to log stream...')
}
else {
log.info(`Reconnecting to log stream (attempt ${reconnectAttempts + 1}/${maxReconnectAttempts})...`)
}

log.info('Reading log stream...')
let reader: ReadableStreamDefaultReader<Uint8Array> | null = null

const decoder = new TextDecoder()
let buffer = '' // Buffer for incomplete lines
try {
const response = await fetch(logUrl)

// Helper to process and display a log message
const processLogMessage = (message: string) => {
if (!message.trim())
return
if (!response.ok) {
const errorText = await response.text().catch(() => 'unknown error')

// Check for final status messages from the server
// Server sends "Build succeeded", "Build failed", "Job already succeeded", etc.
const statusMatch = message.match(/^(?:Build|Job already) (succeeded|failed|expired|released|cancelled)$/i)
if (statusMatch) {
finalStatus = statusMatch[1].toLowerCase()
// Don't display status messages as log lines - they'll be displayed as final status
return
// If we get 404, the job might be done - don't retry
if (response.status === 404) {
log.warn(`Log stream not found (job may have completed)`)
return null
}
if (response.status === 401 || response.status === 403) {
log.warn(`Log stream authorization failed (${response.status})`)
return null
}
log.warn(`Could not stream logs (${response.status}): ${errorText}`)
reconnectAttempts++
await new Promise(resolve => setTimeout(resolve, reconnectDelayMs))
continue
}

// Don't display logs after we've received a final status (e.g., cleanup messages after failure)
if (finalStatus)
return

// Print log line directly to console (no spinner to avoid _events errors)
if (!hasReceivedLogs) {
hasReceivedLogs = true
log.info('') // Add blank line before first log
reader = response.body?.getReader() ?? null
if (!reader) {
log.warn('No response body for log stream')
reconnectAttempts++
await new Promise(resolve => setTimeout(resolve, reconnectDelayMs))
continue
}
log.info(message)
}

while (true) {
const { done, value } = await reader.read()
if (done)
break
// Successfully connected - reset reconnect attempts
reconnectAttempts = 0

buffer += decoder.decode(value, { stream: true })
const decoder = new TextDecoder()
let buffer = '' // Buffer for incomplete lines

// Process complete lines (SSE format: "data: message\n\n")
const lines = buffer.split('\n')
// Keep the last incomplete line in the buffer
buffer = lines.pop() || ''
while (true) {
// Use timeout to detect stale connections quickly
const { done, value } = await readWithTimeout(reader, readTimeoutMs)
if (done) {
// Stream ended normally
break
}

for (const line of lines) {
if (line.startsWith('data: ')) {
const message = line.slice(6) // Remove "data: " prefix
processLogMessage(message)
buffer += decoder.decode(value, { stream: true })

// Process complete lines (SSE format: "data: message\n\n" or "id: N\ndata: message\n\n")
const lines = buffer.split('\n')
// Keep the last incomplete line in the buffer
buffer = lines.pop() || ''

for (const line of lines) {
// Track sequence ID if present (for resumption)
if (line.startsWith('id: ')) {
const seqStr = line.slice(4).trim()
const seq = Number.parseInt(seqStr, 10)
if (!Number.isNaN(seq)) {
lastSequence = seq
}
}
// Process data lines
else if (line.startsWith('data: ')) {
const message = line.slice(6) // Remove "data: " prefix
const gotFinalStatus = processLogMessage(message)
if (gotFinalStatus) {
// Got final status, we're done
reader.cancel().catch(() => {})
return finalStatus
}
}
// Ignore keepalive comments (lines starting with :)
}
}
}

// Process any remaining data in buffer
if (buffer.startsWith('data: ')) {
const message = buffer.slice(6)
processLogMessage(message)
// Process any remaining data in buffer
if (buffer.startsWith('data: ')) {
const message = buffer.slice(6)
processLogMessage(message)
}

// Stream ended normally - if we have final status, return it
if (finalStatus) {
return finalStatus
}

// Stream ended without final status - might need to reconnect
// But first check if this was a clean close (job might be done)
log.info('Log stream ended, checking if more data available...')
reconnectAttempts++
await new Promise(resolve => setTimeout(resolve, reconnectDelayMs))
}
catch (err) {
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reconnection logic can potentially loop indefinitely if the server repeatedly accepts connections but closes the stream without sending a final status message. The reconnectAttempts counter is reset to 0 on line 339 after each successful connection, then incremented to 1 on line 396 when the stream ends. If the next connection succeeds, it resets to 0 again. This means the function will never hit maxReconnectAttempts as long as connections keep succeeding, even if none of them produce a final status. Consider whether reconnectAttempts should track total loop iterations rather than just connection failures, or add a separate counter for total attempts regardless of connection success.

Copilot uses AI. Check for mistakes.
Comment thread src/build/request.ts
break
}
// Direct SSE: Connect directly to CF Worker with JWT token
let logUrl = `${directLogsUrl}?token=${encodeURIComponent(directLogsToken)}`
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JWT token is being passed as a URL query parameter, which can expose it in server logs, browser history, and referrer headers. Consider using a more secure method such as passing the token in an Authorization header instead. This is especially important for JWT tokens that may have longer validity periods.

Copilot uses AI. Check for mistakes.
Comment thread src/build/request.ts
Comment on lines +307 to +310
const response = await fetch(logUrl)

// Helper to process and display a log message
const processLogMessage = (message: string) => {
if (!message.trim())
return
if (!response.ok) {
const errorText = await response.text().catch(() => 'unknown error')
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AbortController is created but never used. The signal is passed to fetch, but the controller's abort() method is never called, meaning the controller serves no purpose. Either remove the controller if it's not needed, or call controller.abort() when cleaning up the reader on timeout or error to properly cancel the fetch request and release resources.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed, check again

@sonarqubecloud
Copy link
Copy Markdown

@riderx
Copy link
Copy Markdown
Member

riderx commented Feb 3, 2026

@WcaleNieWolny mergeable?

@riderx
Copy link
Copy Markdown
Member

riderx commented Feb 16, 2026

will not merge we found better solution

@riderx riderx closed this Feb 16, 2026
@riderx riderx deleted the support_direct_sse_logs branch March 20, 2026 08:14
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants