Add DeepSeek V4 2-rank EP MoE single-layer program#426
Conversation
Introduces models/deepseek/v4/moe_ep.py: an end-to-end MoE decode program assembled as @pl.program with explicit cross-rank dispatch/combine over an HCCL window scratch (mirrors the test_l3_ep_dispatch_combine protocol with four payload channels: x_i8 / scale / weight / r_route). Supporting refactors so single-card kernels compose under @pl.inline inside @pl.function(InCore) callers: - expert_routed: take recv_weights, apply per-row routing weight on the w2 output. combine then becomes a pure scatter + dense sum, so the cross-rank reduce in moe_ep saves one window channel. - combine: drop recv_weights; sum only. - gate: route over the full global expert space when config.EP_ROUTING_GLOBAL is True (default False keeps legacy single-card shard-local behavior). - hc_pre / hc_post / gate / expert_shared / expert_routed: expose *_inline aliases via pl.inline(<fn>._func) and return their pl.Out tensor so inline call expressions parse. - gate: rename per-spmd-loop base offset (t1_g / t1_h / t1_s) and hc_pre: rename rowN → rowN_p after fillpad — both work around the strict-SSA InCore-inline parser (pypto #1603). config: - DEMO preset n_routed_experts 8 → 16 (matches 2-rank EP demo with N_LOCAL=4 per rank, TOPK=2). - Add EP_ROUTING_GLOBAL flag; moe_ep flips it before importing sub-kernels. moe.py: thread recv_weights into expert_routed and drop it from combine to match the new contract.
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR refactors DeepSeek-V4 MoE weight application and introduces a complete distributed 2-rank pipeline. Routing weights now apply in ChangesWeight Routing Refactor
Routing Modes and Configuration
Inline Kernel Composition
End-to-End EP MoE Pipeline
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the DeepSeek-V4 MoE implementation to support multi-card Expert Parallelism (EP) by applying routing weights row-wise in expert_routed instead of combine, and introduces a 2-rank end-to-end EP single-layer decode program in moe_ep.py. Feedback highlights critical synchronization and initialization issues in moe_ep.py where allocated window buffers (pub_counts, count_done, and combine_done) are not zero-initialized, potentially causing data corruption or premature barrier fall-throughs. Additionally, evaluating N_EXPERTS at module import time in gate.py based on mutable global configuration poses a high risk of module-caching bugs.
| # ---------- histogram: scalar histogram on indices ---------- | ||
| send_counts = pl.array.create(N_RANKS * N_LOCAL, pl.INT32) |
There was a problem hiding this comment.
The window buffers pub_counts and count_done are allocated but not zero-initialized. Since notifications are only sent when v != 0, any cells where v == 0 will contain uninitialized garbage values. This will corrupt the prefix sum offsets and counts, leading to incorrect routing and potential crashes. Additionally, uninitialized synchronization cells in count_done can cause the barrier wait to fall through prematurely. Please initialize both buffers to 0 at the start of dispatch_step.
# ---------- initialization ----------
for r0 in pl.range(N_RANKS * N_RANKS):
for e0 in pl.range(N_LOCAL):
pl.write(pub_counts, [r0, e0], 0)
for r0 in pl.range(N_RANKS):
pl.write(count_done, [r0, 0], 0)
# ---------- histogram: scalar histogram on indices ----------
send_counts = pl.array.create(N_RANKS * N_LOCAL, pl.INT32)| ) -> pl.Tensor[[B, S, D], pl.BF16]: | ||
| # ---------- push: TPUT recv_y rows to peer's routed_y_buf ---- |
There was a problem hiding this comment.
The combine_done window buffer is not zero-initialized. If it contains stale or garbage values >= 1, the barrier wait will succeed immediately without waiting for peers, causing a race condition. Please initialize combine_done to 0 at the start of combine_step.
| ) -> pl.Tensor[[B, S, D], pl.BF16]: | |
| # ---------- push: TPUT recv_y rows to peer's routed_y_buf ---- | |
| ) -> pl.Tensor[[B, S, D], pl.BF16]: | |
| for r0 in pl.range(N_RANKS): | |
| pl.write(combine_done, [r0, 0], 0) | |
| # ---------- push: TPUT recv_y rows to peer's routed_y_buf ---- |
| # over its own [n_routed_experts // EP_WORLD_SIZE] shard. | ||
| # EP_ROUTING_GLOBAL=True (used by moe_ep.py): every rank routes over the full | ||
| # global expert set so dispatch can fan tokens across ranks. | ||
| N_EXPERTS = M.n_routed_experts if _cfg.EP_ROUTING_GLOBAL else M.n_routed_experts // EP_WORLD_SIZE |
There was a problem hiding this comment.
Evaluating N_EXPERTS at module import time based on mutable global configuration (_cfg.EP_ROUTING_GLOBAL) is highly prone to module-caching bugs. If gate.py is imported elsewhere first (e.g., in moe.py or unit tests), subsequent imports of gate in moe_ep.py will reuse the cached module from sys.modules without re-evaluating N_EXPERTS, leading to incorrect shapes and runtime errors. Consider refactoring the design to avoid module-level constants that depend on mutable global state, or ensure that modules are reloaded when configuration changes.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@models/deepseek/v4/moe_ep.py`:
- Line 1072: The assignment of a lambda to the name out is triggering Ruff E731;
replace that lambda with a small helper function named out that accepts (name,
shape, dtype) and returns TensorSpec(name, shape, dtype, is_output=True). Update
the single-line lambda at the current declaration to a proper def out(name,
shape, dtype): return TensorSpec(name, shape, dtype, is_output=True) so all
existing uses of out continue to work unchanged and satisfy the linter.
- Around line 1047-1049: The three statements currently use semicolon-chained
multiple append calls (routed_w1_i8_list.append(w1_i8);
routed_w1_s_list.append(w1_s) etc.), which triggers E702; split each
semicolon-separated append into its own line so each list append call is on a
separate statement (e.g., routed_w1_i8_list.append(w1_i8) on one line and
routed_w1_s_list.append(w1_s) on the next), doing the same for routed_w3_* and
routed_w2_* append pairs to satisfy the linter.
- Around line 24-28: moe_ep.py currently mutates config after some sub-kernel
modules may already be cached, causing stale FLASH/EP_* constants in
hc_pre_inline, hc_post_inline, gate_inline, expert_shared_inline and
expert_routed_inline; move the config overrides so they happen before any import
of those sub-kernel modules (or, if imports must be earlier, perform
importlib.reload on each of those modules after applying overrides) so their
module-level bindings (FLASH, EP_WORLD_SIZE, B/S/D/RECV_MAX) are computed
against the correct values; also fix the lint issues by replacing the
multi-statement semicolon line(s) like routed_w*_i8_list.append(...); ... with
separate statements on their own lines and replace `out = lambda ...` with a
proper def out(...) function to satisfy E702 and E731.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c6629ba2-2b5d-45bf-829d-7310ac329acd
📒 Files selected for processing (9)
models/deepseek/v4/combine.pymodels/deepseek/v4/config.pymodels/deepseek/v4/expert_routed.pymodels/deepseek/v4/expert_shared.pymodels/deepseek/v4/gate.pymodels/deepseek/v4/hc_post.pymodels/deepseek/v4/hc_pre.pymodels/deepseek/v4/moe.pymodels/deepseek/v4/moe_ep.py
| # === Module preamble: override config BEFORE importing sub-kernels ========== | ||
| # Sub-kernels (hc_pre / gate / expert_routed / ...) bind preset constants at | ||
| # module-import time via ``from config import FLASH as M``. By the time those | ||
| # modules execute their first line, the overrides below must already be in | ||
| # place — otherwise they'd capture FLASH and EP_WORLD_SIZE=16 instead. |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find repo paths that can import the EP sub-kernels or moe_ep in the same process.
# If any of the sub-kernels can be imported before moe_ep, this import-order dependency is real.
rg -n -C2 --type=py 'from\s+(?:models\.deepseek\.v4\.)?(hc_pre|hc_post|gate|expert_shared|expert_routed|moe_ep)\s+import' .
rg -n -C2 --type=py 'import\s+(?:models\.deepseek\.v4\.)?(hc_pre|hc_post|gate|expert_shared|expert_routed|moe_ep)\b' .Repository: hw-native-sys/pypto-lib
Length of output: 8706
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Inspect moe_ep.py around the config override + imports
sed -n '1,120p' models/deepseek/v4/moe_ep.py
# 2) Find where the *_inline kernels are defined / imported from
rg -n "hc_pre_inline|hc_post_inline|gate_inline|expert_shared_inline|expert_routed_inline" . --glob='*.py'
# 3) Inspect those inline modules for "from config import ..." patterns that bind constants at import time
# (only check the files that define the inline symbols)
python3 - <<'PY'
import subprocess, re, os, textwrap
root='.'
# get candidate python files that mention the inline symbols
cmd=["bash","-lc", "rg -l \"hc_pre_inline|hc_post_inline|gate_inline|expert_shared_inline|expert_routed_inline\" . --glob='*.py'"]
out=subprocess.check_output(cmd, text=True)
files=[f.strip() for f in out.splitlines() if f.strip()]
print("INLINE_FILES:", len(files))
for f in files[:50]:
print(f)
PYRepository: hw-native-sys/pypto-lib
Length of output: 7079
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Check Ruff-flagged lines in moe_ep.py
sed -n '1020,1090p' models/deepseek/v4/moe_ep.py
# 2) Inspect whether sub-kernels bind config constants at import time.
# Look for "from config import FLASH as M" / module-level uses of EP_* / RECV_MAX / etc.
for f in models/deepseek/v4/hc_pre.py models/deepseek/v4/gate.py models/deepseek/v4/expert_shared.py models/deepseek/v4/expert_routed.py models/deepseek/v4/hc_post.py; do
echo "===== $f ====="
rg -n "from config import|FLASH as|EP_WORLD_SIZE|EP_ROUTING_GLOBAL|RECV_MAX" "$f" || true
sed -n '1,120p' "$f"
done
# 3) Find other modules that import moe_ep, and see if they also import those sub-kernels.
rg -n --type=py "(from .*moe_ep import|import .*moe_ep)" models/deepseek/v4 || true
rg -n --type=py "from\s+(?:hc_pre|hc_post|gate|expert_shared|expert_routed)\s+import" models/deepseek/v4 || true
# 4) (Narrow) find within top-level scripts any combined imports that could load sub-kernels before moe_ep.
rg -n --type=py "moe_ep" . || trueRepository: hw-native-sys/pypto-lib
Length of output: 40776
🏁 Script executed:
#!/bin/bash
set -euo pipefail
nl -ba models/deepseek/v4/moe_ep.py | sed -n '1025,1085p'Repository: hw-native-sys/pypto-lib
Length of output: 110
🏁 Script executed:
#!/bin/bash
set -euo pipefail
python3 - <<'PY'
from pathlib import Path
p=Path("models/deepseek/v4/moe_ep.py")
lines=p.read_text().splitlines()
def show(a,b):
for i in range(a,b+1):
if 1<=i<=len(lines):
print(f"{i}:{lines[i-1]}")
print("=== moe_ep.py:55-85 ===")
show(55,85)
print("=== moe_ep.py:1035-1085 ===")
show(1035,1085)
PYRepository: hw-native-sys/pypto-lib
Length of output: 5024
Fix import-time config binding for EP sub-kernels (moe_ep.py)
moe_ep.py mutates config before importing hc_pre_inline/hc_post_inline/gate_inline/expert_shared_inline/expert_routed_inline (55-74, 69-73), but those kernel modules bind constants at module import time (e.g., from config import FLASH as M, ... and derive module-level B/S/D/RECV_MAX globals). If any of these modules were imported earlier in the same Python process (possible since models/deepseek/v4/moe.py and the decode entrypoints import hc_pre/hc_post/gate/expert_* at top-level), Python will reuse cached modules and silently keep stale FLASH/EP-related values, mixing incompatible shape/routing contracts.
- Ruff:
routed_w*_i8_list.append(...); ...violates E702 on 1047-1049;out = lambda ...violates E731 on 1072.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@models/deepseek/v4/moe_ep.py` around lines 24 - 28, moe_ep.py currently
mutates config after some sub-kernel modules may already be cached, causing
stale FLASH/EP_* constants in hc_pre_inline, hc_post_inline, gate_inline,
expert_shared_inline and expert_routed_inline; move the config overrides so they
happen before any import of those sub-kernel modules (or, if imports must be
earlier, perform importlib.reload on each of those modules after applying
overrides) so their module-level bindings (FLASH, EP_WORLD_SIZE, B/S/D/RECV_MAX)
are computed against the correct values; also fix the lint issues by replacing
the multi-statement semicolon line(s) like routed_w*_i8_list.append(...); ...
with separate statements on their own lines and replace `out = lambda ...` with
a proper def out(...) function to satisfy E702 and E731.
| routed_w1_i8_list.append(w1_i8); routed_w1_s_list.append(w1_s) | ||
| routed_w3_i8_list.append(w3_i8); routed_w3_s_list.append(w3_s) | ||
| routed_w2_i8_list.append(w2_i8); routed_w2_s_list.append(w2_s) |
There was a problem hiding this comment.
Split these semicolon-chained appends.
Ruff is already flagging E702 here, so this will likely fail lint as written.
Suggested fix
- routed_w1_i8_list.append(w1_i8); routed_w1_s_list.append(w1_s)
- routed_w3_i8_list.append(w3_i8); routed_w3_s_list.append(w3_s)
- routed_w2_i8_list.append(w2_i8); routed_w2_s_list.append(w2_s)
+ routed_w1_i8_list.append(w1_i8)
+ routed_w1_s_list.append(w1_s)
+ routed_w3_i8_list.append(w3_i8)
+ routed_w3_s_list.append(w3_s)
+ routed_w2_i8_list.append(w2_i8)
+ routed_w2_s_list.append(w2_s)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| routed_w1_i8_list.append(w1_i8); routed_w1_s_list.append(w1_s) | |
| routed_w3_i8_list.append(w3_i8); routed_w3_s_list.append(w3_s) | |
| routed_w2_i8_list.append(w2_i8); routed_w2_s_list.append(w2_s) | |
| routed_w1_i8_list.append(w1_i8) | |
| routed_w1_s_list.append(w1_s) | |
| routed_w3_i8_list.append(w3_i8) | |
| routed_w3_s_list.append(w3_s) | |
| routed_w2_i8_list.append(w2_i8) | |
| routed_w2_s_list.append(w2_s) |
🧰 Tools
🪛 Ruff (0.15.14)
[error] 1047-1047: Multiple statements on one line (semicolon)
(E702)
[error] 1048-1048: Multiple statements on one line (semicolon)
(E702)
[error] 1049-1049: Multiple statements on one line (semicolon)
(E702)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@models/deepseek/v4/moe_ep.py` around lines 1047 - 1049, The three statements
currently use semicolon-chained multiple append calls
(routed_w1_i8_list.append(w1_i8); routed_w1_s_list.append(w1_s) etc.), which
triggers E702; split each semicolon-separated append into its own line so each
list append call is on a separate statement (e.g.,
routed_w1_i8_list.append(w1_i8) on one line and routed_w1_s_list.append(w1_s) on
the next), doing the same for routed_w3_* and routed_w2_* append pairs to
satisfy the linter.
| sw2_i8 = sw2_i8.unsqueeze(0).expand(N_RANKS, -1, -1).contiguous() | ||
| sw2_s = sw2_s.unsqueeze(0).expand(N_RANKS, -1).contiguous() | ||
|
|
||
| out = lambda name, shape, dtype: TensorSpec(name, shape, dtype, is_output=True) |
There was a problem hiding this comment.
Replace the out lambda with a small helper.
Ruff E731 flags assigning a lambda here.
Suggested fix
- out = lambda name, shape, dtype: TensorSpec(name, shape, dtype, is_output=True)
+ def out(name, shape, dtype):
+ return TensorSpec(name, shape, dtype, is_output=True)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| out = lambda name, shape, dtype: TensorSpec(name, shape, dtype, is_output=True) | |
| def out(name, shape, dtype): | |
| return TensorSpec(name, shape, dtype, is_output=True) |
🧰 Tools
🪛 Ruff (0.15.14)
[error] 1072-1072: Do not assign a lambda expression, use a def
Rewrite out as a def
(E731)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@models/deepseek/v4/moe_ep.py` at line 1072, The assignment of a lambda to the
name out is triggering Ruff E731; replace that lambda with a small helper
function named out that accepts (name, shape, dtype) and returns
TensorSpec(name, shape, dtype, is_output=True). Update the single-line lambda at
the current declaration to a proper def out(name, shape, dtype): return
TensorSpec(name, shape, dtype, is_output=True) so all existing uses of out
continue to work unchanged and satisfy the linter.
Summary
models/deepseek/v4/moe_ep.py, an end-to-end MoE decode program assembled as@pl.programwith explicit cross-rank dispatch/combine over an HCCL window scratch (four payload channels:x_i8,scale,weight,r_route).expert_routed,combine,gate,hc_pre,hc_post,expert_shared) so they compose under@pl.inlineinside@pl.function(InCore)callers — exposes*_inlinealiases and adjusts SSA naming for the strict InCore-inline parser.expert_routednow applies the per-row routing weight on thew2output, turningcombineinto a pure scatter+sum and saving one window channel in the EP reduce.EP_ROUTING_GLOBALconfig flag sogatecan route over the full global expert space (default off keeps legacy single-card shard-local behavior); bumps DEMO presetn_routed_experts8 → 16 to match the 2-rank EP demo (N_LOCAL=4,TOPK=2).moe.pyto threadrecv_weightsthroughexpert_routedand drop it fromcombineto match the new contract.