feat: enhance log streaming with timeout and direct SSE support#476
feat: enhance log streaming with timeout and direct SSE support#476WcaleNieWolny wants to merge 4 commits intomainfrom
Conversation
- Introduced a `readWithTimeout` function to handle read operations with a specified timeout. - Updated `streamBuildLogs` to support direct SSE streaming using a provided URL and token. - Improved error handling and reconnection logic for log streaming. - Adjusted parameters for `streamBuildLogs` to accept new arguments for direct log streaming. - Enhanced logging for better visibility during connection attempts and failures.
📝 WalkthroughWalkthroughThe Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot review please :) |
The AbortController was created but never used to abort the fetch. Simplified the code by removing the dead code. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This pull request enhances the build log streaming functionality by removing the complex proxy system and implementing direct Server-Sent Events (SSE) streaming with improved resilience. The changes introduce timeout handling, automatic reconnection with sequence tracking for resumption, and better error handling for various failure scenarios.
Changes:
- Introduced
readWithTimeoutfunction to detect stale connections and trigger reconnection - Refactored
streamBuildLogsto accept direct SSE endpoint URL and JWT token instead of constructing the endpoint from host/job/app parameters - Implemented reconnection logic with sequence tracking to resume from last received message
- Enhanced error handling to differentiate between temporary failures (retry) and terminal errors (don't retry)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| while (reconnectAttempts < maxReconnectAttempts) { | ||
| if (finalStatus) { | ||
| break | ||
| } | ||
| // Direct SSE: Connect directly to CF Worker with JWT token | ||
| let logUrl = `${directLogsUrl}?token=${encodeURIComponent(directLogsToken)}` | ||
| if (lastSequence >= 0) { | ||
| logUrl += `&last_sequence=${lastSequence}` | ||
| } | ||
|
|
||
| const reader = response.body?.getReader() | ||
| if (!reader) { | ||
| log.warn('No response body for log stream') | ||
| return null | ||
| if (reconnectAttempts === 0) { | ||
| log.info('Connecting to log stream...') | ||
| } | ||
| else { | ||
| log.info(`Reconnecting to log stream (attempt ${reconnectAttempts + 1}/${maxReconnectAttempts})...`) | ||
| } | ||
|
|
||
| log.info('Reading log stream...') | ||
| let reader: ReadableStreamDefaultReader<Uint8Array> | null = null | ||
|
|
||
| const decoder = new TextDecoder() | ||
| let buffer = '' // Buffer for incomplete lines | ||
| try { | ||
| const response = await fetch(logUrl) | ||
|
|
||
| // Helper to process and display a log message | ||
| const processLogMessage = (message: string) => { | ||
| if (!message.trim()) | ||
| return | ||
| if (!response.ok) { | ||
| const errorText = await response.text().catch(() => 'unknown error') | ||
|
|
||
| // Check for final status messages from the server | ||
| // Server sends "Build succeeded", "Build failed", "Job already succeeded", etc. | ||
| const statusMatch = message.match(/^(?:Build|Job already) (succeeded|failed|expired|released|cancelled)$/i) | ||
| if (statusMatch) { | ||
| finalStatus = statusMatch[1].toLowerCase() | ||
| // Don't display status messages as log lines - they'll be displayed as final status | ||
| return | ||
| // If we get 404, the job might be done - don't retry | ||
| if (response.status === 404) { | ||
| log.warn(`Log stream not found (job may have completed)`) | ||
| return null | ||
| } | ||
| if (response.status === 401 || response.status === 403) { | ||
| log.warn(`Log stream authorization failed (${response.status})`) | ||
| return null | ||
| } | ||
| log.warn(`Could not stream logs (${response.status}): ${errorText}`) | ||
| reconnectAttempts++ | ||
| await new Promise(resolve => setTimeout(resolve, reconnectDelayMs)) | ||
| continue | ||
| } | ||
|
|
||
| // Don't display logs after we've received a final status (e.g., cleanup messages after failure) | ||
| if (finalStatus) | ||
| return | ||
|
|
||
| // Print log line directly to console (no spinner to avoid _events errors) | ||
| if (!hasReceivedLogs) { | ||
| hasReceivedLogs = true | ||
| log.info('') // Add blank line before first log | ||
| reader = response.body?.getReader() ?? null | ||
| if (!reader) { | ||
| log.warn('No response body for log stream') | ||
| reconnectAttempts++ | ||
| await new Promise(resolve => setTimeout(resolve, reconnectDelayMs)) | ||
| continue | ||
| } | ||
| log.info(message) | ||
| } | ||
|
|
||
| while (true) { | ||
| const { done, value } = await reader.read() | ||
| if (done) | ||
| break | ||
| // Successfully connected - reset reconnect attempts | ||
| reconnectAttempts = 0 | ||
|
|
||
| buffer += decoder.decode(value, { stream: true }) | ||
| const decoder = new TextDecoder() | ||
| let buffer = '' // Buffer for incomplete lines | ||
|
|
||
| // Process complete lines (SSE format: "data: message\n\n") | ||
| const lines = buffer.split('\n') | ||
| // Keep the last incomplete line in the buffer | ||
| buffer = lines.pop() || '' | ||
| while (true) { | ||
| // Use timeout to detect stale connections quickly | ||
| const { done, value } = await readWithTimeout(reader, readTimeoutMs) | ||
| if (done) { | ||
| // Stream ended normally | ||
| break | ||
| } | ||
|
|
||
| for (const line of lines) { | ||
| if (line.startsWith('data: ')) { | ||
| const message = line.slice(6) // Remove "data: " prefix | ||
| processLogMessage(message) | ||
| buffer += decoder.decode(value, { stream: true }) | ||
|
|
||
| // Process complete lines (SSE format: "data: message\n\n" or "id: N\ndata: message\n\n") | ||
| const lines = buffer.split('\n') | ||
| // Keep the last incomplete line in the buffer | ||
| buffer = lines.pop() || '' | ||
|
|
||
| for (const line of lines) { | ||
| // Track sequence ID if present (for resumption) | ||
| if (line.startsWith('id: ')) { | ||
| const seqStr = line.slice(4).trim() | ||
| const seq = Number.parseInt(seqStr, 10) | ||
| if (!Number.isNaN(seq)) { | ||
| lastSequence = seq | ||
| } | ||
| } | ||
| // Process data lines | ||
| else if (line.startsWith('data: ')) { | ||
| const message = line.slice(6) // Remove "data: " prefix | ||
| const gotFinalStatus = processLogMessage(message) | ||
| if (gotFinalStatus) { | ||
| // Got final status, we're done | ||
| reader.cancel().catch(() => {}) | ||
| return finalStatus | ||
| } | ||
| } | ||
| // Ignore keepalive comments (lines starting with :) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Process any remaining data in buffer | ||
| if (buffer.startsWith('data: ')) { | ||
| const message = buffer.slice(6) | ||
| processLogMessage(message) | ||
| // Process any remaining data in buffer | ||
| if (buffer.startsWith('data: ')) { | ||
| const message = buffer.slice(6) | ||
| processLogMessage(message) | ||
| } | ||
|
|
||
| // Stream ended normally - if we have final status, return it | ||
| if (finalStatus) { | ||
| return finalStatus | ||
| } | ||
|
|
||
| // Stream ended without final status - might need to reconnect | ||
| // But first check if this was a clean close (job might be done) | ||
| log.info('Log stream ended, checking if more data available...') | ||
| reconnectAttempts++ | ||
| await new Promise(resolve => setTimeout(resolve, reconnectDelayMs)) | ||
| } | ||
| catch (err) { |
There was a problem hiding this comment.
The reconnection logic can potentially loop indefinitely if the server repeatedly accepts connections but closes the stream without sending a final status message. The reconnectAttempts counter is reset to 0 on line 339 after each successful connection, then incremented to 1 on line 396 when the stream ends. If the next connection succeeds, it resets to 0 again. This means the function will never hit maxReconnectAttempts as long as connections keep succeeding, even if none of them produce a final status. Consider whether reconnectAttempts should track total loop iterations rather than just connection failures, or add a separate counter for total attempts regardless of connection success.
| break | ||
| } | ||
| // Direct SSE: Connect directly to CF Worker with JWT token | ||
| let logUrl = `${directLogsUrl}?token=${encodeURIComponent(directLogsToken)}` |
There was a problem hiding this comment.
The JWT token is being passed as a URL query parameter, which can expose it in server logs, browser history, and referrer headers. Consider using a more secure method such as passing the token in an Authorization header instead. This is especially important for JWT tokens that may have longer validity periods.
| const response = await fetch(logUrl) | ||
|
|
||
| // Helper to process and display a log message | ||
| const processLogMessage = (message: string) => { | ||
| if (!message.trim()) | ||
| return | ||
| if (!response.ok) { | ||
| const errorText = await response.text().catch(() => 'unknown error') |
There was a problem hiding this comment.
The AbortController is created but never used. The signal is passed to fetch, but the controller's abort() method is never called, meaning the controller serves no purpose. Either remove the controller if it's not needed, or call controller.abort() when cleaning up the reader on timeout or error to properly cancel the fetch request and release resources.
There was a problem hiding this comment.
Already fixed, check again
|
|
@WcaleNieWolny mergeable? |
|
will not merge we found better solution |



Changes
readWithTimeoutfunction to handle read operations with a specified timeout.streamBuildLogsto support direct SSE streaming using a provided URL and token.streamBuildLogsto accept new arguments for direct log streaming.Motivation
Martin told me to improve the Capgo Native build. And I believe that by removing the complex proxy system around server-side events we decrease the cost of running Capgo Native builder as well as provide a more reliable experience for the end user.
Business impact
Low/medium
Removing this complex proxy system decreases the cost as well as the complexity of Capgo Builder, which positively improves the quality of code as well as decreases the costs. Furthermore, it improves reliability of the system for the user.
Summary by CodeRabbit
New Features
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.