Skip to content

Conversation

@lukiod
Copy link

@lukiod lukiod commented Dec 20, 2025

Summary

This PR enables ComfyStream to support Generative Streams (e.g., Text-to-Video workflows) where the client provides prompt data via the WebRTC Data Channel but sends no input video/audio stream.

Previously, the pipeline would halt or fail to initialize without an incoming media stream to drive the clock. This change allows the pipeline to self-drive frame generation in "Generative Mode."

Changes

  • src/comfystream/modalities.py: Added PrimitiveString to the list of recognized text_input nodes. This allows workflows using PrimitiveString for prompts to be correctly validated as accepting text input.
  • server/app.py: Updated the WebRTC offer handling to proactively create VideoStreamTrack and AudioStreamTrack for the output, even if the client has not offered any input tracks.
  • src/comfystream/pipeline.py:
    • Implemented logic in get_processed_video_frame to generate frames when in generative mode (no input, but output exists).
    • Added synthetic Presentation Time Stamp (PTS) generation to ensure smooth playback for generated streams.

Impact

Testing

  • Verified manually using a verify code script to confirm the pipeline generates frames with correct timestamps in the absence of input.
import sys
import os
import torch
import av
from unittest.mock import MagicMock, AsyncMock, patch

# Add src to path just in case
sys.path.insert(0, os.path.abspath("src"))

from comfystream.pipeline import Pipeline

async def verify():
    print("Starting verification for Generative Pipeline...")
    
    # Mock Client
    mock_client = MagicMock()
    # Return a black frame tensor (1, H, W, 3)
    mock_client.get_video_output = AsyncMock(return_value=torch.zeros((1, 512, 512, 3)))
    mock_client.set_prompts = AsyncMock()
    mock_client.update_prompts = AsyncMock()
    
    # Patch ComfyStreamClient in pipeline module so Pipeline uses our mock
    with patch('comfystream.pipeline.ComfyStreamClient', return_value=mock_client):
        pipeline = Pipeline()
        
    # Mock IO capabilities to simulate Generative Video Mode
    # Video: Input=False, Output=True
    pipeline._cached_io_capabilities = {
        "video": {"input": False, "output": True},
        "audio": {"input": False, "output": False},
        "text": {"input": True, "output": False}
    }
    
    # Test get_processed_video_frame
    try:
        print("Requesting Frame 1...")
        frame1 = await pipeline.get_processed_video_frame()
        print(f"Frame 1 - PTS: {frame1.pts}, Timebase: {frame1.time_base}")
        
        if frame1.pts != 0:
            print(f"FAILURE: Expected PTS 0, got {frame1.pts}")
            return
            
        print("Requesting Frame 2...")
        frame2 = await pipeline.get_processed_video_frame()
        print(f"Frame 2 - PTS: {frame2.pts}, Timebase: {frame2.time_base}")
        
        if frame2.pts != 1:
            print(f"FAILURE: Expected PTS 1, got {frame2.pts}")
            return
            
        if frame2.time_base.denominator != 30:
             print(f"FAILURE: Expected Timebase 1/30, got {frame2.time_base}")
             return

        print("Verification SUCCESS: Generative frames produced with correct PTS.")
        
    except Exception as e:
        print(f"Verification FAILED with exception: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    asyncio.run(verify())
  • Verified PrimitiveString detection works correctly for modality negotiation.

Copy link
Collaborator

@eliteprox eliteprox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukiod These changes are looking great! I made a text-to-images type of workflow using Text Renderer and Batch Images nodes for testing to demo how to build generative workflows in comfystream. One small edit was needed to the SaveTensor node to support batched image outputs from workflows like this.

Please review the suggested additions to SaveTensor node which allowed for testing the workflow (link in pr) lukiod#4

@lukiod
Copy link
Author

lukiod commented Dec 25, 2025

@eliteprox i have fixed the threading issue and standardize the image this is the test i run a checking if a 3-D and 4-D PyTorch tensor is passed how will the what save_tensor.py will return
image

this is the testing code i used


import asyncio
import torch
import sys
import importlib.util
import os
import pytest

# Setup path for comfystream import
sys.path.append(os.path.join(os.getcwd(), 'src'))

# Import tensor_cache
from comfystream import tensor_cache

# Load SaveTensor module directly to avoid full 'nodes' package init
file_path = os.path.join("nodes", "tensor_utils", "save_tensor.py")
spec = importlib.util.spec_from_file_location("save_tensor_mod", file_path)
save_tensor_mod = importlib.util.module_from_spec(spec)
sys.modules["save_tensor_mod"] = save_tensor_mod
spec.loader.exec_module(save_tensor_mod)

SaveTensor = save_tensor_mod.SaveTensor

async def verify_dimensions():
    print("Initializing test loop...")
    loop = asyncio.get_running_loop()
    tensor_cache.init(loop)
    
    node = SaveTensor()
    
    print("\n--- Test Case 1: Batch Size = 1 ---")
    # Shape: [1, 512, 512, 3] -> Expected in queue: [512, 512, 3]
    batch_one = torch.randn(1, 512, 512, 3)
    node.execute(batch_one)
    
    try:
        item = await asyncio.wait_for(tensor_cache.image_outputs.get(), timeout=1.0)
        print(f"Retrieved item shape: {item.shape}")
        if item.dim() == 3:
            print("SUCCESS: Item is 3D.")
        else:
            print(f"FAILURE: Item is {item.dim()}D (Expected 3D).")
    except asyncio.TimeoutError:
        print("FAILURE: Timeout (queue empty).")

    print("\n--- Test Case 2: Batch Size = 2 ---")
    # Shape: [2, 512, 512, 3] -> Expected in queue: [512, 512, 3] (x2)
    batch_two = torch.randn(2, 512, 512, 3)
    node.execute(batch_two)
    
    for i in range(2):
        try:
            item = await asyncio.wait_for(tensor_cache.image_outputs.get(), timeout=1.0)
            print(f"Retrieved item {i+1} shape: {item.shape}")
            if item.dim() == 3:
                print(f"SUCCESS: Item {i+1} is 3D.")
            else:
                print(f"FAILURE: Item {i+1} is {item.dim()}D (Expected 3D).")
        except asyncio.TimeoutError:
            print(f"FAILURE: Timeout on item {i+1}.")

    print("\nVerification Complete.")

if __name__ == "__main__":
    asyncio.run(verify_dimensions())

@lukiod lukiod marked this pull request as ready for review December 25, 2025 03:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ComfyStream Generative Workflows

2 participants