Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions jenkins/L0_MergeRequest.groovy
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@Library(['bloom-jenkins-shared-lib@main', 'trtllm-jenkins-shared-lib@main']) _

import java.lang.InterruptedException
import java.nio.charset.StandardCharsets
import groovy.transform.Field
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
Expand Down Expand Up @@ -770,15 +771,23 @@ def getCbtsResult(pipeline, testFilter, globalVars)
return null
}
// Piggyback input JSON on testFilter so each L0_Test stage agent can
// re-run main.py and regenerate cbts_test_db/ locally. Capped at
// 256 KB; oversize → drop piggyback, Layer 3 falls back to source.
// re-run main.py and regenerate cbts_test_db/ locally. The payload is
// base64-encoded because the raw JSON contains PR diffs and may include
// ${...} or {...} sequences that the Jenkins tokenmacro plugin would
// try to evaluate when the parent serializes globalVars for the
// Parameterized-Remote-Trigger plugin, raising MacroEvaluationException
// and blocking test dispatch. Capped at 256 KB (post-encoding, since
// that is what travels on the wire); oversize → drop piggyback,
// Layer 3 falls back to source.
final int CBTS_INPUT_PIGGYBACK_MAX_BYTES = 256000
def inputJsonSize = inputJson.length()
if (inputJsonSize <= CBTS_INPUT_PIGGYBACK_MAX_BYTES) {
result.cbts_input_json = inputJson
pipeline.echo("CBTS Layer 3: cbts_input_json piggyback enabled (${inputJsonSize} bytes)")
def inputJsonB64 = inputJson.getBytes(StandardCharsets.UTF_8).encodeBase64().toString()
def inputJsonB64Size = inputJsonB64.length()
if (inputJsonB64Size <= CBTS_INPUT_PIGGYBACK_MAX_BYTES) {
result.cbts_input_json_b64 = inputJsonB64
pipeline.echo("CBTS Layer 3: cbts_input_json_b64 piggyback enabled " +
"(${inputJsonB64Size} bytes encoded, ${inputJson.length()} bytes raw)")
} else {
pipeline.echo("CBTS Layer 3: cbts_input_json is ${inputJsonSize} bytes, " +
pipeline.echo("CBTS Layer 3: cbts_input_json_b64 is ${inputJsonB64Size} bytes, " +
"exceeds ${CBTS_INPUT_PIGGYBACK_MAX_BYTES}-byte piggyback limit; " +
"downstream stages will fall back to source test-db " +
"(Layer 2 stage filtering still applies)")
Expand Down
26 changes: 20 additions & 6 deletions jenkins/L0_Test.groovy
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@Library(['bloom-jenkins-shared-lib@main', 'trtllm-jenkins-shared-lib@main']) _

import java.lang.InterruptedException
import java.nio.charset.StandardCharsets
import groovy.transform.Field
import groovy.json.JsonOutput
import com.nvidia.bloom.KubernetesManager
Expand Down Expand Up @@ -2456,16 +2457,29 @@ def renderTestDB(pipeline, testContext, llmSrc, stageName, preDefinedMakoOpts=nu

sh "pip3 install --extra-index-url https://urm.nvidia.com/artifactory/api/pypi/sw-tensorrt-pypi/simple --ignore-installed trt-test-db==1.8.5+bc6df7"
// CBTS Layer 3: regenerate cbts_test_db/ on this stage agent from the
// piggybacked input JSON if not already present.
// piggybacked input JSON if not already present. The piggyback payload is
// base64-encoded on the orchestrator (see getCbtsResult in
// L0_MergeRequest.groovy) to keep tokenmacro from interpreting ${...} or
// {...} fragments inside the PR diff when globalVars is serialized. If
// decoding or regeneration throws (truncated/malformed payload), we
// swallow the error: the override directory will be absent below, the
// overrideYaml check will fail, and renderTestDB falls back to the
// source test-db.
def cbts = testFilter[(CBTS_RESULT)]
if (cbts != null && cbts.test_db_dir_override && cbts.cbts_input_json) {
if (cbts != null && cbts.test_db_dir_override && cbts.cbts_input_json_b64) {
def overrideDir = "${llmSrc}/${cbts.test_db_dir_override}"
def dirExists = sh(returnStdout: true, script: "test -d ${overrideDir} && echo yes || echo no").trim()
if (dirExists != "yes") {
def cbtsInputLocal = Utils.createTempLocation(pipeline, "./cbts_input.json")
pipeline.writeFile(file: cbtsInputLocal, text: cbts.cbts_input_json)
sh "apt-get update -qq && apt-get install -y -qq python3-yaml || true"
sh "cd ${llmSrc} && python3 jenkins/scripts/cbts/main.py ${cbtsInputLocal} > /dev/null 2>&1 || true"
try {
def cbtsInputJson = new String(cbts.cbts_input_json_b64.decodeBase64(), StandardCharsets.UTF_8)
def cbtsInputLocal = Utils.createTempLocation(pipeline, "./cbts_input.json")
pipeline.writeFile(file: cbtsInputLocal, text: cbtsInputJson)
sh "apt-get update -qq && apt-get install -y -qq python3-yaml || true"
sh "cd ${llmSrc} && python3 jenkins/scripts/cbts/main.py ${cbtsInputLocal} > /dev/null 2>&1 || true"
} catch (Exception e) {
echo "CBTS Layer 3: failed to materialize piggyback payload " +
"(${e.class.simpleName}: ${e.message}); falling back to source test-db"
}
}
}
def testDBPath = "${llmSrc}/tests/integration/test_lists/test-db"
Expand Down
22 changes: 14 additions & 8 deletions jenkins/scripts/cbts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,19 @@ Decision JSON:
`cbts_test_db/` is written on the L0_MergeRequest agent and is not
available to downstream `L0_Test-*` pods. To regenerate it per stage:

1. `getCbtsResult` stores the input JSON in `result.cbts_input_json`,
which rides along inside `testFilter`.
2. `renderTestDB` on the stage agent writes it to a temp file and re-runs
`main.py`. Output is deterministic, so each agent gets the same
`cbts_test_db/` as L0_MergeRequest produced.

If `cbts_input_json` exceeds 256 KB the piggyback is dropped; Layer 3 falls
1. `getCbtsResult` base64-encodes the input JSON and stores it in
`result.cbts_input_json_b64`, which rides along inside `testFilter`.
Encoding is mandatory: the raw payload contains PR diff text which can
include `${...}` or `{...}` fragments (Python f-strings, shell vars, etc.)
that the Jenkins tokenmacro plugin tries to evaluate when the parent
serializes `globalVars` for `Parameterized-Remote-Trigger`, raising
`MacroEvaluationException` and blocking test dispatch.
2. `renderTestDB` on the stage agent decodes `cbts_input_json_b64`, writes
it to a temp file, and re-runs `main.py`. Output is deterministic, so
each agent gets the same `cbts_test_db/` as L0_MergeRequest produced.

If `cbts_input_json_b64` exceeds 256 KB (post-encoding, the size that
actually travels over the wire) the piggyback is dropped; Layer 3 falls
back to the source test-db on each stage agent. Layer 2 still applies.

## Split-collapse heuristic (Layer 2.5)
Expand Down Expand Up @@ -279,7 +285,7 @@ CBTS defers to the existing filter chain when:
YAML edit)
- Combined scope is `None` (incompatible mix)
- Layer 3 narrowing would empty a block — block keeps original tests
- `cbts_input_json` exceeds 256 KB — Layer 3 falls back per stage
- `cbts_input_json_b64` (post-encoding) exceeds 256 KB — Layer 3 falls back per stage
- Narrowed YAML missing/empty on a stage agent — renderTestDB falls back

Every fallback emits an `echo` log line.
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,6 @@ follow_imports = "silent"
explicit_package_bases = true
namespace_packages = true
strict = true
warn_unused_ignores = false
show_error_codes = true
# NB:
# . -> 'tensorrt_llm'
Expand Down
6 changes: 3 additions & 3 deletions tensorrt_llm/_torch/distributed/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
from tensorrt_llm.plugin.plugin import CustomAllReduceHelper

# Feature flag: GEMM→NCCL-window zero-copy (writes GEMM output directly into
# the window buffer so the allreduce needs no extra copy). Off by default
# (0); set TLLM_NCCL_SYMMETRIC_ZERO_COPY=1 to enable.
# the window buffer so the allreduce needs no extra copy). On by default
# (1); set TLLM_NCCL_SYMMETRIC_ZERO_COPY=0 to disable.
# Evaluated once at import — O(1) module-global lookup on every call,
# equivalent to a C++ static-bool cached env var.
_NCCL_SYMMETRIC_ZERO_COPY: bool = (os.environ.get(
"TLLM_NCCL_SYMMETRIC_ZERO_COPY", "0") == "1")
"TLLM_NCCL_SYMMETRIC_ZERO_COPY", "1") == "1")

_thread_local = threading.local()

Expand Down
2 changes: 1 addition & 1 deletion tensorrt_llm/_torch/pyexecutor/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class SampleStateWithMMResult(SampleState[SampleStateTensors, SampleStateTensors


@dataclass(kw_only=True, frozen=True, slots=True)
class RequestGroupKey(Generic[GenericStrategyKeyType]): # type: ignore[misc]
class RequestGroupKey(Generic[GenericStrategyKeyType]):
strategy_key: GenericStrategyKeyType
needs_probs: bool

Expand Down
10 changes: 5 additions & 5 deletions tensorrt_llm/commands/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
from tensorrt_llm.mapping import CpType
from tensorrt_llm.serve import OpenAIDisaggServer, OpenAIServer
from tensorrt_llm.serve.tool_parser import ToolParserFactory
from tensorrt_llm.serve.tool_parser.tool_parser_factory import \
resolve_auto_tool_parser
from tensorrt_llm.serve.tool_parser.tool_parser_factory import (
MODEL_TYPE_TO_TOOL_PARSER, resolve_auto_tool_parser)
from tensorrt_llm.tools.importlib_utils import import_custom_module_from_dir
from tensorrt_llm.usage import config as _telemetry_config
from tensorrt_llm.visual_gen import VisualGen
Expand Down Expand Up @@ -898,11 +898,11 @@ def serve(
if tool_parser == "auto":
resolved = resolve_auto_tool_parser(model)
if resolved is None:
supported_model_types = ", ".join(
sorted(MODEL_TYPE_TO_TOOL_PARSER.keys()))
raise click.BadParameter(
f"Cannot auto-detect tool parser for model '{model}'. "
f"Supported model types for auto-detection: qwen2, qwen3, "
f"qwen3_moe, qwen3_5, qwen3_5_moe, qwen3_next, deepseek_v3, "
f"deepseek_v32, kimi_k2, kimi_k25, glm4. "
f"Supported model types for auto-detection: {supported_model_types}. "
f"Please specify a parser explicitly: "
f"{list(ToolParserFactory.parsers.keys())}",
param_hint="--tool_parser")
Expand Down
17 changes: 11 additions & 6 deletions tensorrt_llm/serve/openai_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
from tensorrt_llm.serve.responses_utils import \
request_preprocess as responses_api_request_preprocess
from tensorrt_llm.serve.tool_parser.tool_parser_factory import ToolParserFactory
from tensorrt_llm.serve.visual_gen_metrics import \
build_visual_gen_timing_headers
from tensorrt_llm.serve.visual_gen_utils import parse_visual_gen_params
from tensorrt_llm.version import __version__ as VERSION
from tensorrt_llm.visual_gen import VisualGen
Expand Down Expand Up @@ -1945,12 +1947,15 @@ async def openai_image_generation(self, request: ImageGenerationRequest,
"URL mode is not supported for image generation")

latency = time.perf_counter() - image_gen_start # seconds
logger.info(
f"Image {image_id} generated and encoded: "
f"latency={latency:.3f}s generation={getattr(output.metrics, 'generation', 0.0):.3f}s "
f"denoise={getattr(output.metrics, 'denoise', 0.0):.3f}s")

return JSONResponse(content=response.model_dump())
metrics = output.metrics
generation = metrics.generation if metrics is not None else 0.0
denoise = metrics.denoise if metrics is not None else 0.0
logger.info(f"Image {image_id} generated and encoded: "
f"latency={latency:.3f}s generation={generation:.3f}s "
f"denoise={denoise:.3f}s")
headers = build_visual_gen_timing_headers(metrics)

return JSONResponse(content=response.model_dump(), headers=headers)

except Exception as e:
logger.error(traceback.format_exc())
Expand Down
19 changes: 15 additions & 4 deletions tensorrt_llm/serve/openai_video_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from tensorrt_llm.logger import logger
from tensorrt_llm.media.encoding import resolve_video_format
from tensorrt_llm.serve.openai_protocol import VideoGenerationRequest, VideoJob, VideoJobList
from tensorrt_llm.serve.visual_gen_metrics import build_visual_gen_timing_headers
from tensorrt_llm.serve.visual_gen_utils import VIDEO_STORE, parse_visual_gen_params
from tensorrt_llm.visual_gen.params import VisualGenParams

Expand Down Expand Up @@ -79,11 +80,15 @@ async def openai_video_generation_sync(self, raw_request: Request) -> Response:
frame_rate=output.frame_rate or request.fps or params.frame_rate,
)
latency = time.perf_counter() - sync_video_start # seconds
metrics = output.metrics
generation = metrics.generation if metrics is not None else 0.0
denoise = metrics.denoise if metrics is not None else 0.0
logger.info(
f"Video {video_id} generated and encoded: "
f"latency={latency:.3f}s generation={getattr(output.metrics, 'generation', 0.0):.3f}s "
f"denoise={getattr(output.metrics, 'denoise', 0.0):.3f}s"
f"latency={latency:.3f}s generation={generation:.3f}s "
f"denoise={denoise:.3f}s"
)
headers = build_visual_gen_timing_headers(metrics)

# TODO(TRTLLM-11579): the OpenAI Videos API does not yet define a
# multi-file response, so we return only the first video as a file
Expand All @@ -96,6 +101,7 @@ async def openai_video_generation_sync(self, raw_request: Request) -> Response:
actual_output_path,
media_type=media_type,
filename=actual_path.name,
headers=headers,
)

except ValueError as e:
Expand Down Expand Up @@ -271,15 +277,20 @@ async def _generate_video_background(
frame_rate=output.frame_rate or request.fps or params.frame_rate,
)
latency = time.perf_counter() - background_start # seconds
metrics = output.metrics
generation = metrics.generation if metrics is not None else 0.0
denoise = metrics.denoise if metrics is not None else 0.0
logger.info(
f"Video {video_id} async-generated and encoded: "
f"latency={latency:.3f}s generation={getattr(output.metrics, 'generation', 0.0):.3f}s "
f"denoise={getattr(output.metrics, 'denoise', 0.0):.3f}s"
f"latency={latency:.3f}s generation={generation:.3f}s "
f"denoise={denoise:.3f}s"
)
job = await VIDEO_STORE.get(video_id)
if job:
job.status = "completed"
job.completed_at = int(time.time())
# TODO: Expose VisualGen timing metrics for async jobs once the
# OpenAI video job metadata contract includes server timings.
# Store the first path on output_path for single-video
# compatibility, and the full list on output_paths.
job.output_path = str(saved_paths[0])
Expand Down
56 changes: 54 additions & 2 deletions tensorrt_llm/serve/scripts/benchmark_visual_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import asyncio
import gc
import json
import math
import os
import random
import sys
Expand All @@ -59,6 +60,11 @@
load_visual_gen_prompts,
print_visual_gen_results,
)
from tensorrt_llm.serve.visual_gen_metrics import (
SERVER_TIMING_HEADER,
VISUAL_GEN_DENOISE_TIMING,
VISUAL_GEN_GENERATION_TIMING,
)

AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)

Expand Down Expand Up @@ -107,6 +113,41 @@ def _get_headers() -> dict[str, str]:
}


def _parse_server_timing_header(headers: Any) -> dict[str, float]:
"""Parse required VisualGen Server-Timing metrics into seconds.

Online VisualGen perf sanity gates on engine-side generation time, so a
successful response without valid ``Server-Timing`` metadata is treated as
a failed benchmark request instead of silently contributing a zero sample.
"""
value = headers.get(SERVER_TIMING_HEADER)
if value is None:
raise ValueError(f"Missing VisualGen timing response header: {SERVER_TIMING_HEADER}")

timings = {}
for entry in value.split(","):
parts = [part.strip() for part in entry.split(";")]
name = parts[0]
for parameter in parts[1:]:
key, _, parameter_value = parameter.partition("=")
if key.strip() == "dur":
timings[name] = float(parameter_value) / 1000.0
break
return timings


def _get_server_timing_metric(
timings: dict[str, float], name: str, *, require_positive: bool
) -> float:
"""Return a required Server-Timing metric, in seconds."""
if name not in timings:
raise ValueError(f"Missing VisualGen Server-Timing metric: {name}")
timing = timings[name]
if not math.isfinite(timing) or timing < 0 or (require_positive and timing <= 0):
raise ValueError(f"Invalid VisualGen Server-Timing metric {name}: {timing}")
return timing


async def _do_post(
request_input: VisualGenRequestInput,
payload: dict[str, Any],
Expand All @@ -130,6 +171,17 @@ async def _do_post(
await response.read()
output.success = True
output.latency = time.perf_counter() - st
server_timings = _parse_server_timing_header(response.headers)
output.generation = _get_server_timing_metric(
server_timings,
VISUAL_GEN_GENERATION_TIMING,
require_positive=True,
)
output.denoise = _get_server_timing_metric(
server_timings,
VISUAL_GEN_DENOISE_TIMING,
require_positive=False,
)
else:
body = await response.text()
output.error = f"HTTP {response.status}: {body}"
Expand Down Expand Up @@ -314,7 +366,7 @@ def load_prompts(args: argparse.Namespace) -> list[VisualGenSampleRequest]:
def _resolve_num_gpus(args: argparse.Namespace) -> int:
"""Determine the number of GPUs from explicit arg or server config YAML.

Priority: --num-gpus (explicit) > --extra-visual-gen-options YAML > default 1.
Priority: --num-gpus (explicit) > --visual-gen-args YAML > default 1.
"""
if args.num_gpus is not None:
return args.num_gpus
Expand Down Expand Up @@ -575,7 +627,7 @@ def main(args: argparse.Namespace):
type=int,
default=None,
help="Number of GPUs used by the server. Overrides the value inferred "
"from --extra-visual-gen-options. Defaults to 1 if neither is given.",
"from --visual-gen-args. Defaults to 1 if neither is given.",
)

output_group = parser.add_argument_group("Output")
Expand Down
Loading
Loading