Skip to content

Fix LLM streamProcess to emit tokens in real-time using callbackFlow #237

@sanchitmonga22

Description

@sanchitmonga22

Problem Statement

The LLM streaming implementation in LLMComponent.kt (line 512-529) buffers all tokens before emitting them, defeating the purpose of streaming.

Current behavior:

  • Tokens are collected into a list via callback
  • Only after streamGenerate returns are tokens emitted to the Flow
  • Consumers wait for entire generation to complete before receiving any tokens

Expected behavior:

  • Tokens should be emitted immediately as they arrive
  • Consumers receive real-time updates during generation

Impact

  • High - Core streaming functionality is broken
  • Priority: P1

Proposed Solution

Restructure to use callbackFlow with trySend, similar to STTComponent.streamTranscribe (lines 342-382):

fun streamProcess(input: LLMInput): Flow<LLMGenerationChunk> = callbackFlow {
    ensureReady()
    val service = llmService ?: throw SDKError.ComponentNotReady("LLM service not available")
    input.validate()
    
    val options = input.options ?: RunAnywhereGenerationOptions(...)
    val prompt = buildPrompt(input.messages, input.systemPrompt ?: llmConfiguration.effectiveSystemPrompt)
    
    var chunkIndex = 0
    val sessionId = generateUUID()

    service.streamGenerate(prompt, options) { token ->
        trySend(LLMGenerationChunk(
            text = token,
            chunkIndex = chunkIndex++,
            sessionId = sessionId,
            isComplete = false
        ))
    }
    
    // Send final completion chunk
    trySend(LLMGenerationChunk(
        text = "",
        chunkIndex = chunkIndex,
        sessionId = sessionId,
        isComplete = true,
        finishReason = FinishReason.COMPLETE
    ))
    
    close()
    awaitClose { }
}

Implementation Plan

  • Replace flow { } with callbackFlow { }
  • Use trySend inside the callback instead of collecting to a list
  • Emit final completion chunk with isComplete = true
  • Ensure proper error handling with close(exception)
  • Add awaitClose { } for proper cleanup
  • Test streaming behavior in Android sample app

Files to Modify

  • sdk/runanywhere-kotlin/src/commonMain/kotlin/com/runanywhere/sdk/components/llm/LLMComponent.kt

Related PR Comments

Success Criteria

  • Tokens are emitted immediately as they arrive
  • Chat UI shows progressive text generation
  • No token buffering or delays
  • Proper cancellation support

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions