Add generation end-to-end profiling#17
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request adds comprehensive Chrome trace profiling throughout the LLM serving stack. It introduces a new ChangesProfiling System Implementation and Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive profiling framework (python/profile) for PyPTO, including a Chrome trace-event recorder, environment-based configuration, and trace merging capabilities. It integrates this profiling system across the codebase, wrapping key execution steps (such as model initialization, prefill, decode, serving requests, and scheduler operations) in profile spans, and tracks host and device wall times for kernel execution. The review feedback focuses on improving the robustness, performance, and accuracy of the profiling framework. Key suggestions include optimizing I/O performance by removing aggressive flushing and making writes fail-silent, handling malformed JSON lines gracefully during trace merging, supporting proper task-level visualization in asyncio environments by using task IDs as thread IDs, preventing worker processes from deleting other workers' trace fragments, and improving cross-platform compatibility for default process names.
| def _write(self, event: dict[str, Any]) -> None: | ||
| if self._fh is None: | ||
| return | ||
| line = json.dumps(event, separators=(",", ":"), default=str) | ||
| with self._lock: | ||
| if self._fh is not None: | ||
| self._fh.write(line) | ||
| self._fh.write("\n") | ||
| self._fh.flush() |
There was a problem hiding this comment.
In a high-frequency profiling context (such as recording every kernel execution span or duration), calling self._fh.flush() on every single write forces a disk write/syscall every time, introducing a massive performance bottleneck. Additionally, any unhandled I/O exceptions (e.g., disk full) during _write could crash the main model execution flow. We should remove the aggressive flushing (letting the OS/Python file buffering handle it efficiently, since close() already flushes and closes the file at the end) and wrap the write operation in a try-except block to ensure profiling is fail-silent.
| def _write(self, event: dict[str, Any]) -> None: | |
| if self._fh is None: | |
| return | |
| line = json.dumps(event, separators=(",", ":"), default=str) | |
| with self._lock: | |
| if self._fh is not None: | |
| self._fh.write(line) | |
| self._fh.write("\n") | |
| self._fh.flush() | |
| def _write(self, event: dict[str, Any]) -> None: | |
| if self._fh is None: | |
| return | |
| try: | |
| line = json.dumps(event, separators=(",", ":"), default=str) | |
| with self._lock: | |
| if self._fh is not None: | |
| self._fh.write(line) | |
| self._fh.write("\n") | |
| except Exception: | |
| pass |
| for line in f: | ||
| stripped = line.strip() | ||
| if not stripped: | ||
| continue | ||
| events.append(json.loads(stripped)) |
There was a problem hiding this comment.
If a process crashed or was terminated abruptly, its last line in the fragment file might be incomplete or malformed, causing json.loads to raise a json.JSONDecodeError and preventing the entire trace from being merged. We should wrap json.loads in a try-except block to handle malformed lines gracefully and continue merging the rest of the trace.
| for line in f: | |
| stripped = line.strip() | |
| if not stripped: | |
| continue | |
| events.append(json.loads(stripped)) | |
| for line in f: | |
| stripped = line.strip() | |
| if not stripped: | |
| continue | |
| try: | |
| events.append(json.loads(stripped)) | |
| except json.JSONDecodeError: | |
| continue |
| def get_profiler(*, process_name: str | None = None) -> ProfileRecorder: | ||
| """Return the process-local recorder configured from SA_PROFILE_* envs.""" | ||
| global _profiler, _profiler_pid | ||
| pid = os.getpid() | ||
| if _profiler is not None and _profiler_pid == pid: | ||
| return _profiler |
There was a problem hiding this comment.
Since get_profiler caches the profiler instance in the global _profiler variable, any subsequent calls to get_profiler with a different process_name (such as the transition from pypto-serving to pypto-serving-api in run_serve) will silently ignore the new process name. We should allow updating the process name and writing a new metadata event if a different process_name is provided.
| def get_profiler(*, process_name: str | None = None) -> ProfileRecorder: | |
| """Return the process-local recorder configured from SA_PROFILE_* envs.""" | |
| global _profiler, _profiler_pid | |
| pid = os.getpid() | |
| if _profiler is not None and _profiler_pid == pid: | |
| return _profiler | |
| def get_profiler(*, process_name: str | None = None) -> ProfileRecorder: | |
| """Return the process-local recorder configured from SA_PROFILE_* envs.""" | |
| global _profiler, _profiler_pid | |
| pid = os.getpid() | |
| if _profiler is not None and _profiler_pid == pid: | |
| if process_name is not None and _profiler.process_name != process_name: | |
| _profiler.process_name = process_name | |
| _profiler.metadata("process_name", process_name, tid=0) | |
| return _profiler |
| def _ensure_thread_metadata(self, tid: int) -> None: | ||
| if tid in self._thread_names: | ||
| return | ||
| with self._lock: | ||
| if tid in self._thread_names: | ||
| return | ||
| self._thread_names.add(tid) | ||
| self.metadata("thread_name", threading.current_thread().name, tid=tid) |
There was a problem hiding this comment.
In an asyncio application (like the serving server), all concurrent requests run on the same thread, leading to overlapping, non-nested spans on the same tid in Chrome Trace Viewer, which makes the trace unreadable. We should use the asyncio task ID as the tid when running inside an active event loop to separate concurrent async tasks into their own rows.
| def _ensure_thread_metadata(self, tid: int) -> None: | |
| if tid in self._thread_names: | |
| return | |
| with self._lock: | |
| if tid in self._thread_names: | |
| return | |
| self._thread_names.add(tid) | |
| self.metadata("thread_name", threading.current_thread().name, tid=tid) | |
| def _get_tid_and_name(self) -> tuple[int, str]: | |
| try: | |
| import asyncio | |
| task = asyncio.current_task() | |
| if task is not None: | |
| return id(task), task.get_name() | |
| except (ImportError, AttributeError): | |
| pass | |
| return threading.get_ident(), threading.current_thread().name | |
| def _ensure_thread_metadata(self, tid: int, name: str) -> None: | |
| if tid in self._thread_names: | |
| return | |
| with self._lock: | |
| if tid in self._thread_names: | |
| return | |
| self._thread_names.add(tid) | |
| self.metadata("thread_name", name, tid=tid) |
| tid = threading.get_ident() | ||
| self._ensure_thread_metadata(tid) |
There was a problem hiding this comment.
Update span to resolve the tid and name using the new _get_tid_and_name helper to support proper task-level visualization in asyncio environments.
| tid = threading.get_ident() | |
| self._ensure_thread_metadata(tid) | |
| tid, name = self._get_tid_and_name() | |
| self._ensure_thread_metadata(tid, name) |
| tid = threading.get_ident() | ||
| self._ensure_thread_metadata(tid) |
There was a problem hiding this comment.
Update instant to resolve the tid and name using the new _get_tid_and_name helper to support proper task-level visualization in asyncio environments.
| tid = threading.get_ident() | |
| self._ensure_thread_metadata(tid) | |
| tid, name = self._get_tid_and_name() | |
| self._ensure_thread_metadata(tid, name) |
| tid = threading.get_ident() | ||
| self._ensure_thread_metadata(tid) |
There was a problem hiding this comment.
Update duration to resolve the tid and name using the new _get_tid_and_name helper to support proper task-level visualization in asyncio environments.
| tid = threading.get_ident() | |
| self._ensure_thread_metadata(tid) | |
| tid, name = self._get_tid_and_name() | |
| self._ensure_thread_metadata(tid, name) |
| config = load_profile_config() | ||
| if config.enabled and _MAIN_PID_ENV not in os.environ: | ||
| os.environ[_MAIN_PID_ENV] = str(pid) | ||
| _prepare_run(config) |
There was a problem hiding this comment.
If get_profiler is not called in the main process before spawning workers, the first worker to call it will run _prepare_run and potentially delete other workers' fragments. We should explicitly check if we are in the main process (e.g., using multiprocessing.current_process().name == 'MainProcess') before unlinking.
| config = load_profile_config() | |
| if config.enabled and _MAIN_PID_ENV not in os.environ: | |
| os.environ[_MAIN_PID_ENV] = str(pid) | |
| _prepare_run(config) | |
| config = load_profile_config() | |
| import multiprocessing | |
| is_main = multiprocessing.current_process().name == "MainProcess" | |
| if config.enabled and is_main and _MAIN_PID_ENV not in os.environ: | |
| os.environ[_MAIN_PID_ENV] = str(pid) | |
| _prepare_run(config) |
| def _default_process_name() -> str: | ||
| return Path(os.environ.get("_", "python")).name or "python" |
There was a problem hiding this comment.
Using os.environ.get('_') is Unix-specific and often just returns 'python'. Using sys.argv[0] as a fallback or primary source would provide a much more descriptive default process name (e.g., the script name) across both Unix and Windows.
| def _default_process_name() -> str: | |
| return Path(os.environ.get("_", "python")).name or "python" | |
| def _default_process_name() -> str: | |
| import sys | |
| if sys.argv and sys.argv[0]: | |
| return Path(sys.argv[0]).stem | |
| return Path(os.environ.get("_", "python")).name or "python" |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
python/core/async_engine.py (1)
188-218: ⚡ Quick winSpan scope is narrower than its name suggests.
profile_span("AsyncLLMEngine.add_request", ...)only wrapsself.tokenizer.encode(prompt); it closes before the request is built, queued, or streamed. The recorded duration reflects tokenization only, which is misleading under this name. Either rename the span to reflect tokenization, or widen the scope to cover request construction/queueing.♻️ Option: rename to reflect actual scope
- with profile_span( - "AsyncLLMEngine.add_request", - cat="serving", - args={"request_id": request_id, "max_new_tokens": config.max_new_tokens}, - ): - prompt_token_ids = self.tokenizer.encode(prompt) + with profile_span( + "AsyncLLMEngine.tokenize", + cat="serving", + args={"request_id": request_id, "max_new_tokens": config.max_new_tokens}, + ): + prompt_token_ids = self.tokenizer.encode(prompt)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/core/async_engine.py` around lines 188 - 218, The profiling span named "AsyncLLMEngine.add_request" currently only wraps tokenizer.encode(prompt) which mislabels the measured work; either expand the profile_span to include request construction and queueing (wrap from the tokenizer.encode call through creation of Request, storing _request_contexts[request_id], and scheduler.add_request) or rename the span to something like "AsyncLLMEngine.tokenize" to reflect it only measures tokenization; update the span boundaries around profile_span/profile_instant so the timing covers the intended calls (tokenizer.encode, Request(...), _request_contexts assignment, and scheduler.add_request) or change the span name accordingly to avoid misleading metrics.python/core/pypto_executor.py (1)
51-74: 💤 Low valueInconsistent span naming convention.
register_modeluses"PyptoExecutor.register_model"while the other two use"executor.run_prefill"/"executor.run_decode". Aligning them (e.g.PyptoExecutor.run_prefill/run_decode) keeps trace grouping consistent with the rest of the codebase'sClass.methodspan names.♻️ Proposed naming alignment
- with profile_span( - "executor.run_prefill", + with profile_span( + "PyptoExecutor.run_prefill", cat="executor", args={"model_id": model.config.model_id, "batch_size": len(batch.request_ids)}, ):- with profile_span( - "executor.run_decode", + with profile_span( + "PyptoExecutor.run_decode", cat="executor", args={"model_id": model.config.model_id, "batch_size": len(batch.request_ids)}, ):🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/core/pypto_executor.py` around lines 51 - 74, The span names are inconsistent: register_model uses "PyptoExecutor.register_model" while run_prefill and run_decode use "executor.run_prefill"/"executor.run_decode"; update the profile_span name arguments in the methods run_prefill and run_decode to "PyptoExecutor.run_prefill" and "PyptoExecutor.run_decode" respectively so all spans follow the Class.method convention (look for profile_span calls inside PyptoExecutor.register_model, PyptoExecutor.run_prefill, and PyptoExecutor.run_decode).python/core/serving_worker.py (1)
74-79: ⚡ Quick winAvoid re-initializing main profiler in in-process mode;
process_nameis likely ignored
get_profileris a per-PID singleton (_profiler/_profiler_pid), so callingget_profiler(process_name=...)again in the main process won’t re-initialize or overwrite the existing profiler/trace metadata.- In in-process mode,
AsyncLLMEngine.startalready initializes the profiler viaprofile_span("AsyncLLMEngine.start")(which callsget_profiler()withoutprocess_name) beforeWorkerProcess.init_device_and_model()callsget_profiler(process_name="serving-worker-..."); thatprocess_namewon’t take effect unless it’s the firstget_profilercall.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/core/serving_worker.py` around lines 74 - 79, The call to get_profiler(process_name=...) in WorkerProcess.init_device_and_model is ineffective in in-process mode because the profiler singleton is already created by AsyncLLMEngine.start (via profile_span -> get_profiler() without a name); remove the redundant get_profiler(process_name=...) call from WorkerProcess.init_device_and_model (or move/ensure the single get_profiler call that sets process_name happens before AsyncLLMEngine.start) so the profiler process_name is set only on the first get_profiler invocation; update code references in WorkerProcess.init_device_and_model and ensure AsyncLLMEngine.start/profile_span remain the canonical initialization path.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@python/core/async_engine.py`:
- Around line 188-218: The profiling span named "AsyncLLMEngine.add_request"
currently only wraps tokenizer.encode(prompt) which mislabels the measured work;
either expand the profile_span to include request construction and queueing
(wrap from the tokenizer.encode call through creation of Request, storing
_request_contexts[request_id], and scheduler.add_request) or rename the span to
something like "AsyncLLMEngine.tokenize" to reflect it only measures
tokenization; update the span boundaries around profile_span/profile_instant so
the timing covers the intended calls (tokenizer.encode, Request(...),
_request_contexts assignment, and scheduler.add_request) or change the span name
accordingly to avoid misleading metrics.
In `@python/core/pypto_executor.py`:
- Around line 51-74: The span names are inconsistent: register_model uses
"PyptoExecutor.register_model" while run_prefill and run_decode use
"executor.run_prefill"/"executor.run_decode"; update the profile_span name
arguments in the methods run_prefill and run_decode to
"PyptoExecutor.run_prefill" and "PyptoExecutor.run_decode" respectively so all
spans follow the Class.method convention (look for profile_span calls inside
PyptoExecutor.register_model, PyptoExecutor.run_prefill, and
PyptoExecutor.run_decode).
In `@python/core/serving_worker.py`:
- Around line 74-79: The call to get_profiler(process_name=...) in
WorkerProcess.init_device_and_model is ineffective in in-process mode because
the profiler singleton is already created by AsyncLLMEngine.start (via
profile_span -> get_profiler() without a name); remove the redundant
get_profiler(process_name=...) call from WorkerProcess.init_device_and_model (or
move/ensure the single get_profiler call that sets process_name happens before
AsyncLLMEngine.start) so the profiler process_name is set only on the first
get_profiler invocation; update code references in
WorkerProcess.init_device_and_model and ensure AsyncLLMEngine.start/profile_span
remain the canonical initialization path.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0b410563-0087-4682-8304-c49712ecbcb7
📒 Files selected for processing (17)
.gitignoreexamples/model/qwen3_14b/npu_generate.pyexamples/model/qwen3_14b/runner/npu_executor.pyexamples/model/qwen3_14b/runner/npu_runner.pypython/cli/main.pypython/core/async_engine.pypython/core/engine.pypython/core/pypto_executor.pypython/core/server.pypython/core/serving_worker.pypython/profile/__init__.pypython/profile/env.pypython/profile/merge.pypython/profile/recorder.pypython/runtime/worker.pytests/test_batching.pytests/test_profile.py
4217cca to
f1112ca
Compare
|
Addressed the review feedback in f1112ca:
Validation:
|
f1112ca to
a82259f
Compare
|
Updated README examples to use the larger PTO2 ring settings:
Reran HTTP serving with 32 tokens using those settings. Result: HTTP 200, clean task exit=0, generated text starts with: " a Chinese company. The company is located in the United States..." |
a82259f to
6a1c232
Compare
|
Rebased on latest origin/main (25edc21) and resolved conflicts against the new block_ids-based serving/runner path. PR head is now 6a1c232. Validation on rebased branch:
128-token profiling:
|
Summary
Validation