You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Developers go from "I have a Python ML model" to a discoverable BYOC capability on Livepeer in under 5 minutes — surfaced in the Developer Dashboard ready for any caller to invoke.
The Pipeline SDK is the authoring surface that makes this possible: write a Python class, get a containerised, BYOC-compatible, schema-described capability.
Architecture decisions (monorepo + PEP 420 namespace packages, three distributions livepeer-runner / livepeer-client / livepeer-trickle) are captured in the spec — see the Architecture and the companion Client SDK packaging section.
Roadmap
Each step yields a working SDK strictly more capable than the previous one.
C1 — Pipeline base + serve() + hello-world BYOC E2E
C2 — setup() lifecycle + HuggingFace sentiment example
Step 5: live_grayscale example + chroma assertion + ffplay viewer — 9ef95d9 series
Step 4 — _LiveSession + lifecycle (the only outstanding piece)
_LiveSession class encapsulating per-session state
Periodic heartbeat on events_url (gateway liveness signal)
emit_event(payload) user-facing helper
emit_data(payload) helper for data_url when enable_data_output=true
on_stream_stop lifecycle hook
Drain runner-side state on stop — measured: no leak (RSS plateaus ~170 MB after 25 sessions). State drain unnecessary.
Unified error surface — three error sources (subscribe / publish / user process_video raise) log distinctly today with no consistent state propagation. Add _record_error(source, exc, severity): structured ErrorEvent schema (severity ∈ WARN / ERROR / FATAL, source, message, timestamp, consecutive), per-source budget escalating WARN→ERROR after N consecutive failures, flip pipeline._state = ERROR on terminal failures, push events via events_url. Quick first step (~5 LOC): flip _state on TricklePublisherTerminalError. Full schema after live_transcribe surfaces real failure modes.
Verify the live-viewer demo can be brought back once heartbeat + state-drain land
Pydantic param schema for LivePipeline — today on_stream_start(params) and on_params_update(params) receive dict[str, Any]; users parse / validate manually. The batch Pipeline already supports typed params via signature introspection (C4). Extend the same to LivePipeline: let users type on_stream_start(params: MyParams) and have the runner introspect, validate at the HTTP boundary, and emit a meaningful /openapi.json schema for /stream/start's caller-supplied params (today the schema only describes the orchestrator's protocol fields). Required for the developer dashboard / client SDK to render param controls for live capabilities.
Enrich heartbeat payload with PipelineStatus (ai-runner pattern) — today's heartbeat is minimal {"type": "heartbeat", "timestamp"} keep-alive only. ai-runner's report_status_loop uses the same trickle push as both keep-alive AND status report (state, FPS, last_error, restart_count, last_params). One mechanism, dual purpose. Add FPS counters to MediaOutput / MediaPublish and swap heartbeat payload for the rich shape. Keeps /health minimal (k8s contract).
Companion issues — runner / examples polish
Targeted issues spun off from this epic. Two blocking, one cosmetic for full live_transcribe fidelity (5/5 transcripts delivered to SSE).
Blocking (data-loss bugs)
#12 — SDK-side. _resolve_next_seq returns -1 on probe failure; combined with the publisher's +1 increment this duplicate-POSTs to seg 0 on every trickle channel, dropping the first record. Observable on live_transcribe as missing transcript[1]. Fix: one line (return 0 instead of -1) + demote the warning to debug.
Upstream: livepeer/go-livepeer#3924 — gateway's data subscriber tears down too early on /stream/stop, dropping the final emit_data from on_stream_stop. Observable on live_transcribe as missing transcript[4] (orch log shows client disconnected on the final POST). Fix: bounded drain loop in byoc/trickle.go:startDataSubscribe.
Together these account for both observed transcript losses (runner emits 5 → SSE delivers 3 today). Either one fixed independently → 4/5 delivered.
Cosmetic (log noise, no functional impact)
Upstream: livepeer/go-livepeer#3922 — spurious ERROR-level logs at every clean /stream/stop (5 fix sites: ffmpeg subprocess output, trickle preconnect, rtmp2segment probe, orch trickle handler). Operations work; logs just look scary. Pure log-level demotion (if ctx.Err() != nil { debug }).
Examples follow-ups
Live viewer tool for live_grayscale — bring back the webcam-pushed live viewer (deleted because the current PyAV decode→user→encode loop can't sustain real-time webcam load: ring buffer drains, mediamtx kicks the egress publisher). Today the example uses synthetic testsrc + capture-to-file + replay. Bring back the webcam viewer once C8 Step 4 lands.
Worked example covering full LivePipeline lifecycle — live_grayscale exercises the SDK plumbing but only overrides process_video. live_transcribe (Whisper STT) and live_depth (DepthAnything V2) now exercise more of the lifecycle (setup, on_stream_start, process_audio, emit_data, emit_event, on_stream_stop). Still TODO: a test.sh that subscribes to data_url from the caller side and asserts structured records arrive — needs start_byoc_job from #6.
Exercise on_params_update in an example — the only LivePipeline hook with no live demo. Fires on mid-stream parameter changes (caller pushes new params to /stream/params without restart). Smallest viable demo: extend live_detect to accept {"detection_threshold": 0.5} mid-stream and update the YOLO confidence cutoff in-place, with a test.sh step that pushes a new threshold mid-run and asserts the emitted records reflect it. Alternatively document the hook in the SDK README and defer the example until a real use case demands it.
Migrate /stream/params and /stream/stop to control_url subscribe — per the spec, the long-term shape is one HTTP endpoint (/stream/start) plus everything else over the trickle plane. Today BYOC already publishes params + keepalives to control_url (byoc/trickle.go:539) but the orchestrator HTTP-forwards each message via /stream/params (byoc/stream_orchestrator.go:421). Migration must be coordinated upstream: orchestrator drops the HTTP-forwarding step + runner adds trickle subscribe in lockstep. Blocked on: trickle control-channel size / changeover bug (see "Future protocol work" below).
Production-grade live transcribe example — live_transcribe is intentionally the minimal lifecycle demo with explicit 3 s chunking + vad_filter=True; first-transcript latency is ~3 s and word boundaries can split mid-window. For users who actually want production live transcription, add a separate example using whisper_streaming's OnlineASRProcessor (LocalAgreement-2 → ~1 s latency, cleaner boundaries) — same LivePipeline lifecycle, different process_audio body. Same folder shape as live_transcribe, marketed as "production transcribe". Possibly also covers VAD-driven segmentation and emit_data partial-vs-final transcript distinction.
Recover from orchestrator capability drop — gateway sometimes drops the orchestrator from its capability pool after stream failures (Retrying stream with a different orchestrator err=unknown swap reason → no orchestrators available, ending stream). Once dropped, every subsequent /process/stream/start either 400s or kills mid-flight, until register_capability is re-run manually. Investigate (a) re-register watchdog, (b) healthcheck-driven re-register hook, or (c) push a fix upstream in go-livepeer's gateway swap-orch logic.
Switch examples to -network offchain once go-livepeer #3906 lands. Current compose files run with -network arbitrum-one-mainnet -ethUrl https://arb1.arbitrum.io/rpc -ethPassword secret-password and rely on pricePerUnit=0 so no real on-chain payment occurs — but the gateway still polls Arbitrum for orchestrator stake lookups (db_discovery.go), and the public RPC throttles with 429 Too Many Requests lines all over the gateway log. Tracked upstream as livepeer/go-livepeer#3905. When the PR merges, drop -network, -ethUrl, -ethPassword from each example's docker-compose.yml (5 files) and run with bare -network offchain. Eliminates the 429 noise entirely.
Assert grayscale, not just bytes-received — live_grayscale/test.sh now extracts U / V plane averages via ffprobe signalstats and asserts ≈128 (chroma-zero = grayscale).
End state: retire examples/runner/, replace with unit tests, move worked examples to a separate repo — once the SDK stabilizes, delete the in-tree examples/runner/ folder. The lifecycle/coverage value those examples currently provide (setup, on_stream_start, process_video / process_audio, emit_data, emit_event, on_stream_stop, error paths) gets reified as proper unit tests inside livepeer-python-gateway. The worked examples themselves (live_grayscale, live_transcribe, live_depth, replicate_flux, …) move to a standalone livepeer/pipeline-examples repo that depends on the published livepeer.runner package as a normal pip dependency. Aligns examples with how external developers actually consume the SDK, decouples example evolution from SDK release cadence, and keeps this repo focused on the runner itself.
Performance & future improvements
Captured while building examples that surfaced specific optimization
opportunities. Not roadmap-blocking; revisit when concrete use cases
demand them.
Parallel process_video / process_audio execution — the frame
loop today dispatches both hooks sequentially in a single async for-loop,
so heavy inference in one stalls the other. Refactor into two queues fed
from one decoder, drained in separate tasks. Not needed for live_detect or live_transcribe today; triggered
by a real pipeline that needs it (e.g. a Moondream2-class video model
running alongside whisper). Contract change ("frames may interleave across
hooks") so deserves explicit design before flipping.
live_describe — VLM-driven video understanding (GPU) — natural-
language scene description via Moondream2
(~1.6 GB, ~300 ms / inference on GPU). Same LivePipeline lifecycle as live_detect, swaps YOLO for a vision-
language model that emits descriptions instead of bounding boxes. Mirrors live_depth's GPU pattern. Compelling
for "real video understanding" positioning; not strictly needed since live_detect already demonstrates multi-modal LivePipeline.
Concurrent inference patterns documented in SDK README — three-tier
pattern users adopt as inference cost grows: (1) asyncio.to_thread for
offloading individual inference calls so the main loop stays responsive,
(2) asyncio.create_task for fire-and-forget windowed work
(transcribe → emit when done, decouples inference latency from frame
cadence), (3) separate process / IPC for GPU-isolated heavy models. Should
land alongside the parallel process_* refactor above so users understand
which knob to reach for.
Framework adapters (deferred — build on demand)
Migration paths for users from existing ML frameworks. Each ships as its own pip package with its own foreign dep, isolated from core SDK. Build only when a real migration ask shows up.
livepeer-runner-cog — wraps cog.BasePredictor
livepeer-runner-fal — wraps fal.App
livepeer-runner-modal — wraps Modal @app.function
livepeer-runner-bentoml — wraps @bentoml.service
livepeer-runner-confyscript
Future protocol work (cross-team, gated on C9 + upstream go-livepeer)
Fix trickle control-channel size / segment-changeover bug (upstream) — control_url params updates fail silently or get truncated when payload is more than small JSON (~1 MB practical ceiling observed). Hunch is segment-changeover behavior during large writes — possibly FirstByteTimeout, pipe buffering on segment boundaries, or chunk-write semantics across the rollover. Workaround in byoc/stream_gateway.go:1007-1009 switched stop / params to HTTP POST after the bug bit on base64-binary payloads. Blocking dependency for the "migrate to control_url subscribe" follow-up. Upstream go-livepeer change.
Capability identity via OCI digest — replace free-form capability names with content-hashed references like byoc/<repo>@sha256:<digest>. Aligns BYOC with Replicate's reproducibility model. SDK side: livepeer push captures the digest at publish time and bakes it into the manifest. Upstream side: orchestrator registration + gateway routing + OrchestratorInfo carry the digest.
Cosign / Sigstore signing of capability digests — optional layer on top of digest pinning. Publisher signs the digest, gateway verifies signature against publisher's key.
Name:version aliases over digest-pinned wire — Replicate-style mutable names (byoc/text-reverser:v2) that resolve to a digest at lookup time. Wire protocol always pins the digest; aliases are a UX layer.
Outcome
Developers go from "I have a Python ML model" to a discoverable BYOC capability on Livepeer in under 5 minutes — surfaced in the Developer Dashboard ready for any caller to invoke.
The Pipeline SDK is the authoring surface that makes this possible: write a Python class, get a containerised, BYOC-compatible, schema-described capability.
Spec
Design lives in livepeer-specs / pipeline-sdk.md. Update the spec rather than this issue body when the design moves.
Architecture decisions (monorepo + PEP 420 namespace packages, three distributions
livepeer-runner/livepeer-client/livepeer-trickle) are captured in the spec — see the Architecture and the companion Client SDK packaging section.Roadmap
Each step yields a working SDK strictly more capable than the previous one.
Pipelinebase +serve()+ hello-world BYOC E2Esetup()lifecycle + HuggingFace sentiment example/health,/predict,/docs,/openapi.json)BaseModelfor inputs / outputs via signature introspectionBase64Bytes)/healthstate machine matching go-livepeer'sHealthCheckwire formatpredict()+ LLM chat exampleLivePipelinefor trickle transport (real-time video) — see breakdown belowlivepeer pushCLI +livepeer.yamlmanifestorg.livepeer.pipeline.schema)AGENTS.md, expandedPipelinedocstring,examples/runner/_template/)/capability/register— env-gated, wired intoserve()lifespan, lenient on failure (degrade/health, keep FastAPI serving). Deregister on shutdown. Retry on conn-refused / timeout / 5xx; fail fast on 400 / 404 / 405.C8 breakdown —
LivePipeline04cc697831ee44runner.framesnamespace —9688f67live_grayscaleexample + chroma assertion + ffplay viewer —9ef95d9series_LiveSession+ lifecycle (the only outstanding piece)_LiveSessionclass encapsulating per-session stateevents_url(gateway liveness signal)emit_event(payload)user-facing helperemit_data(payload)helper fordata_urlwhenenable_data_output=trueon_stream_stoplifecycle hookDrain runner-side state on stop— measured: no leak (RSS plateaus ~170 MB after 25 sessions). State drain unnecessary.process_videoraise) log distinctly today with no consistent state propagation. Add_record_error(source, exc, severity): structuredErrorEventschema (severity∈ WARN / ERROR / FATAL,source,message,timestamp,consecutive), per-source budget escalating WARN→ERROR after N consecutive failures, flippipeline._state = ERRORon terminal failures, push events viaevents_url. Quick first step (~5 LOC): flip_stateonTricklePublisherTerminalError. Full schema afterlive_transcribesurfaces real failure modes.on_stream_start(params)andon_params_update(params)receivedict[str, Any]; users parse / validate manually. The batchPipelinealready supports typed params via signature introspection (C4). Extend the same toLivePipeline: let users typeon_stream_start(params: MyParams)and have the runner introspect, validate at the HTTP boundary, and emit a meaningful/openapi.jsonschema for/stream/start's caller-supplied params (today the schema only describes the orchestrator's protocol fields). Required for the developer dashboard / client SDK to render param controls for live capabilities.PipelineStatus(ai-runner pattern) — today's heartbeat is minimal{"type": "heartbeat", "timestamp"}keep-alive only. ai-runner'sreport_status_loopuses the same trickle push as both keep-alive AND status report (state, FPS, last_error, restart_count, last_params). One mechanism, dual purpose. Add FPS counters toMediaOutput/MediaPublishand swap heartbeat payload for the rich shape. Keeps/healthminimal (k8s contract).Companion issues — runner / examples polish
Targeted issues spun off from this epic. Two blocking, one cosmetic for full live_transcribe fidelity (5/5 transcripts delivered to SSE).
Blocking (data-loss bugs)
_resolve_next_seqreturns-1on probe failure; combined with the publisher's+1increment this duplicate-POSTs to seg 0 on every trickle channel, dropping the first record. Observable onlive_transcribeas missingtranscript[1]. Fix: one line (return 0instead of-1) + demote the warning to debug./stream/stop, dropping the finalemit_datafromon_stream_stop. Observable onlive_transcribeas missingtranscript[4](orch log showsclient disconnectedon the final POST). Fix: bounded drain loop inbyoc/trickle.go:startDataSubscribe.Together these account for both observed transcript losses (runner emits 5 → SSE delivers 3 today). Either one fixed independently → 4/5 delivered.
Cosmetic (log noise, no functional impact)
/stream/stop(5 fix sites: ffmpeg subprocess output, trickle preconnect, rtmp2segment probe, orch trickle handler). Operations work; logs just look scary. Pure log-level demotion (if ctx.Err() != nil { debug }).Examples follow-ups
Live viewer tool for
live_grayscale— bring back the webcam-pushed live viewer (deleted because the current PyAV decode→user→encode loop can't sustain real-time webcam load: ring buffer drains, mediamtx kicks the egress publisher). Today the example uses synthetictestsrc+ capture-to-file + replay. Bring back the webcam viewer once C8 Step 4 lands.Worked example covering full LivePipeline lifecycle —
live_grayscaleexercises the SDK plumbing but only overridesprocess_video.live_transcribe(Whisper STT) andlive_depth(DepthAnything V2) now exercise more of the lifecycle (setup,on_stream_start,process_audio,emit_data,emit_event,on_stream_stop). Still TODO: atest.shthat subscribes todata_urlfrom the caller side and asserts structured records arrive — needsstart_byoc_jobfrom #6.Exercise
on_params_updatein an example — the only LivePipeline hook with no live demo. Fires on mid-stream parameter changes (caller pushes new params to/stream/paramswithout restart). Smallest viable demo: extendlive_detectto accept{"detection_threshold": 0.5}mid-stream and update the YOLO confidence cutoff in-place, with atest.shstep that pushes a new threshold mid-run and asserts the emitted records reflect it. Alternatively document the hook in the SDK README and defer the example until a real use case demands it.Migrate
/stream/paramsand/stream/stoptocontrol_urlsubscribe — per the spec, the long-term shape is one HTTP endpoint (/stream/start) plus everything else over the trickle plane. Today BYOC already publishes params + keepalives tocontrol_url(byoc/trickle.go:539) but the orchestrator HTTP-forwards each message via/stream/params(byoc/stream_orchestrator.go:421). Migration must be coordinated upstream: orchestrator drops the HTTP-forwarding step + runner adds trickle subscribe in lockstep. Blocked on: trickle control-channel size / changeover bug (see "Future protocol work" below).Production-grade live transcribe example —
live_transcribeis intentionally the minimal lifecycle demo with explicit 3 s chunking +vad_filter=True; first-transcript latency is ~3 s and word boundaries can split mid-window. For users who actually want production live transcription, add a separate example usingwhisper_streaming'sOnlineASRProcessor(LocalAgreement-2 → ~1 s latency, cleaner boundaries) — sameLivePipelinelifecycle, differentprocess_audiobody. Same folder shape aslive_transcribe, marketed as "production transcribe". Possibly also covers VAD-driven segmentation and emit_data partial-vs-final transcript distinction.Recover from orchestrator capability drop — gateway sometimes drops the orchestrator from its capability pool after stream failures (
Retrying stream with a different orchestrator err=unknown swap reason→no orchestrators available, ending stream). Once dropped, every subsequent/process/stream/starteither 400s or kills mid-flight, untilregister_capabilityis re-run manually. Investigate (a) re-register watchdog, (b) healthcheck-driven re-register hook, or (c) push a fix upstream in go-livepeer's gateway swap-orch logic.Switch examples to
-network offchainonce go-livepeer #3906 lands. Current compose files run with-network arbitrum-one-mainnet -ethUrl https://arb1.arbitrum.io/rpc -ethPassword secret-passwordand rely onpricePerUnit=0so no real on-chain payment occurs — but the gateway still polls Arbitrum for orchestrator stake lookups (db_discovery.go), and the public RPC throttles with429 Too Many Requestslines all over the gateway log. Tracked upstream as livepeer/go-livepeer#3905. When the PR merges, drop-network,-ethUrl,-ethPasswordfrom each example'sdocker-compose.yml(5 files) and run with bare-network offchain. Eliminates the 429 noise entirely.Assert grayscale, not just bytes-received —
live_grayscale/test.shnow extracts U / V plane averages viaffprobe signalstatsand asserts ≈128 (chroma-zero = grayscale).End state: retire
examples/runner/, replace with unit tests, move worked examples to a separate repo — once the SDK stabilizes, delete the in-treeexamples/runner/folder. The lifecycle/coverage value those examples currently provide (setup, on_stream_start, process_video / process_audio, emit_data, emit_event, on_stream_stop, error paths) gets reified as proper unit tests insidelivepeer-python-gateway. The worked examples themselves (live_grayscale, live_transcribe, live_depth, replicate_flux, …) move to a standalonelivepeer/pipeline-examplesrepo that depends on the publishedlivepeer.runnerpackage as a normal pip dependency. Aligns examples with how external developers actually consume the SDK, decouples example evolution from SDK release cadence, and keeps this repo focused on the runner itself.Performance & future improvements
Captured while building examples that surfaced specific optimization
opportunities. Not roadmap-blocking; revisit when concrete use cases
demand them.
Parallel
process_video/process_audioexecution — the frameloop today dispatches both hooks sequentially in a single async for-loop,
so heavy inference in one stalls the other. Refactor into two queues fed
from one decoder, drained in separate tasks. Not needed for
live_detectorlive_transcribetoday; triggeredby a real pipeline that needs it (e.g. a Moondream2-class video model
running alongside whisper). Contract change ("frames may interleave across
hooks") so deserves explicit design before flipping.
live_describe— VLM-driven video understanding (GPU) — natural-language scene description via Moondream2
(~1.6 GB, ~300 ms / inference on GPU). Same
LivePipelinelifecycle aslive_detect, swaps YOLO for a vision-language model that emits descriptions instead of bounding boxes. Mirrors
live_depth's GPU pattern. Compellingfor "real video understanding" positioning; not strictly needed since
live_detectalready demonstrates multi-modal LivePipeline.Concurrent inference patterns documented in SDK README — three-tier
pattern users adopt as inference cost grows: (1)
asyncio.to_threadforoffloading individual inference calls so the main loop stays responsive,
(2)
asyncio.create_taskfor fire-and-forget windowed work(transcribe → emit when done, decouples inference latency from frame
cadence), (3) separate process / IPC for GPU-isolated heavy models. Should
land alongside the parallel
process_*refactor above so users understandwhich knob to reach for.
Framework adapters (deferred — build on demand)
Migration paths for users from existing ML frameworks. Each ships as its own pip package with its own foreign dep, isolated from core SDK. Build only when a real migration ask shows up.
livepeer-runner-cog— wrapscog.BasePredictorlivepeer-runner-fal— wrapsfal.Applivepeer-runner-modal— wraps Modal@app.functionlivepeer-runner-bentoml— wraps@bentoml.servicelivepeer-runner-confyscriptFuture protocol work (cross-team, gated on C9 + upstream go-livepeer)
Fix trickle control-channel size / segment-changeover bug (upstream) —
control_urlparams updates fail silently or get truncated when payload is more than small JSON (~1 MB practical ceiling observed). Hunch is segment-changeover behavior during large writes — possiblyFirstByteTimeout, pipe buffering on segment boundaries, or chunk-write semantics across the rollover. Workaround in byoc/stream_gateway.go:1007-1009 switched stop / params to HTTP POST after the bug bit on base64-binary payloads. Blocking dependency for the "migrate to control_url subscribe" follow-up. Upstream go-livepeer change.Capability identity via OCI digest — replace free-form capability names with content-hashed references like
byoc/<repo>@sha256:<digest>. Aligns BYOC with Replicate's reproducibility model. SDK side:livepeer pushcaptures the digest at publish time and bakes it into the manifest. Upstream side: orchestrator registration + gateway routing +OrchestratorInfocarry the digest.Cosign / Sigstore signing of capability digests — optional layer on top of digest pinning. Publisher signs the digest, gateway verifies signature against publisher's key.
Name:version aliases over digest-pinned wire — Replicate-style mutable names (
byoc/text-reverser:v2) that resolve to a digest at lookup time. Wire protocol always pins the digest; aliases are a UX layer.Related
livepeer-specs/_followups.mduvworkspace docs: https://docs.astral.sh/uv/concepts/projects/workspaces/