✨ 新功能 实现普通节点的一拖多/多对一的分支并行处理#600
Open
swfnotswift wants to merge 5 commits intoModelEngine-Group:mainfrom
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
🔗 相关问题 / Related Issue
Issue 链接 / Issue Link: #{$IssueNumber} 👈👈
📋 变更类型 / Type of Change
📝 变更目的 / Purpose of the Change
解决 Waterflow 普通节点一拖多、多对一、多输入 LLM 节点执行异常而做的代码修改。
📋 主要变更 / Brief Changelog
1. FlowContext.java
修改说明
这一段的目标,是让普通节点一拖多时每条分支都拥有独立的 context 身份。
原来的问题是:同一个上游 context 命中多条出边时,后续链路仍然可能共享同一个 contextId。这样一来,多个分支在持久化、状态推进、批次更新时会互相覆盖,看起来像“只跑了一条分支”或者“后到的分支把先到的分支覆盖掉了”。
本次新增的
fork()本质上是一个语义化入口,表示“基于当前上下文复制一个新分支”。convertData(data)用于生成一个新的 context,对外表现为新分支新分支会继承原上下文的大部分运行元数据
但它会保留新的 contextId,且
previous指向原 contextnextPositionId也会一并继承,确保分支在后续送边时不会丢目标信息copyContext(...)被抽出来之后,两个convertData(...)的职责就清晰了:convertData(data)用于“派生一个新身份”convertData(data, id)用于“只替换 data,但保留原身份”这两个入口分别对应本次修复中的两种完全不同的场景:
一拖多分支复制,需要新身份
When.convert或执行期临时合并输入,只需要替换 data,不需要换 identity2. From.java
修改说明
这一段的目标,是在真正发往多条边时,把分支拆成独立上下文,而不是只在概念上“一拖多”。
offer(List<FlowContext<I>> ...)里现在的处理策略是:如果某个 context 只命中一条边,沿用原 context
如果命中多条边,第一条边仍使用原 context
从第二条边开始,对每条边调用
fork()生成新的 branch context这样做有两个好处:
不会把所有分支都强制改成新对象,保留第一条链路的兼容性
又能保证剩余分支拥有独立 contextId,避免互相覆盖
新增的
persistForkedContexts(...)用来解决第二层问题:即使分支 context 在内存里分出来了,如果不先落库、不先加入 trace 的 contextPool,后续When.cache(...)、updateStatus(...)、requestAll(...)仍然会出现查不到、覆盖、丢分支的问题。它做了两件事:
先
updateContextPool(...),把新分支 contextId 加入 trace 可见范围再
save(...),把这些 fork 出来的 context 持久化并且这里加了分布式锁,避免并发分支扩散时 contextPool 更新出现竞争覆盖。
3. To.java: fan-in 配置与 request 过滤
修改说明
这一段的目标,是给多对一节点建立一套明确的“汇聚判定规则”。
本次引入了三个关键概念:
fanInModefanInModeConfiguredmergeKeyGenerator含义分别是:
fanInMode决定多输入节点是“来一条就处理”还是“必须全部到齐再处理”fanInModeConfigured用于区分“用户主动配置”和“框架自动推断”mergeKeyGenerator用于定义哪些输入属于同一组汇聚数据默认 mergeKey 使用
rootId + transId + traceId 集合组合生成,目的是尽量把同一次流程实例里的同一组分支归到一个 key 下。onSubscribe(...)里增加的自动切换逻辑表示:如果当前节点只有一个上游来源,默认
ANY如果当前节点接入多个 distinct from,默认
ALL这保证普通多输入节点在未显式配置时,也能先按“需要汇聚”处理。
4. To.java: 多输入 ready 判断与完整汇聚组选择
修改说明
这一段解决的是多对一节点 request 阶段的两个问题:
候选数据虽然查出来了,但不知道哪些已经真正“到齐”
不能把所有 pending 混在一起直接处理,否则会把不同批次、不同分支组混起来
filterReadyByFanIn(...)的职责是做“到齐判定”。它按 mergeKey 分组,然后统计每个 group 里出现了多少个不同的
position。当 distinct position 数量大于等于当前节点的上游输入数量时,说明这一组输入已经凑齐。这里用
position而不是直接用 context 数量,是因为我们真正关心的是“是否来自不同上游来源”,而不是“来了几条数据”。selectReadyMergeGroup(...)的职责是做“请求期截取”。它不再像默认 filter 那样按 batch 简单取一批,而是:
先按 mergeKey 归组
再找出第一组已经满足完整输入数量的 group
只返回这一组
这样可以避免多组 pending 混在一起被同时拉入一次处理。
markReady(...)则把原来 ready 标记和 ready 过滤的动作收口成一个独立步骤,保证 fan-in 分组判断在前,状态推进在后,顺序更稳定。5. To.java: 处理阶段合并多输入上下文
修改说明
这一段是本次修复里最关键的一层。
前面的 request 修复只能保证“多对一节点能捞到多条上游输入”,但还不能保证“节点执行时真的把这些输入当作一组来消费”。
原问题正是出在这里:
request 阶段已经能看到两条输入
但普通节点的
MAPPING或 LLM 节点底层的PRODUCING执行时,仍然按单条 context 分别处理于是某一条 context 里只包含自己这一侧的数据,另一侧引用自然就是 null
所以
onProcess(...)现在增加了一步:先调用
mergeProcessInputs(pre)再把
processInputs交给真正的处理器执行mergeProcessInputs(...)当前只在以下条件下生效:fanInMode == ALL输入条数大于 1
当前处理模式属于
MAPPING、FLATMAPPING或PRODUCING输入 data 都是
FlowData其中把
PRODUCING也纳入是这次后续补充的关键,因为普通llmNodeState实际走的是FlowStateNode -> Node(... this::stateProduce ...) -> To.PRODUCING这条链路,而不是MAPPING。mergeFlowData(...)则负责把多条FlowContext中的三类核心数据拼起来:businessDatacontextDatapassData合并时使用
FlowUtil.mergeMaps(...),这样下游节点最终看到的是一个完整的、同时包含多个上游输出的FlowData。最终效果是:对于需要同时引用多个上游节点输出的 LLM 节点、代码节点或普通 map 类节点,执行器真正拿到的是“合并后的完整输入”,而不是某一侧的单独 context。
6. To.java: 各处理模式接入新的 request 逻辑
修改说明
这一段解决的是“不同处理模式下 request 路径不一致”的问题。
如果只在某一个模式里改 request 过滤,其它模式仍然会沿用旧逻辑,那么:
reduce 节点可能正确
producing 节点仍然不对
mapping 节点还是只能捞到一边
所以现在统一改成:
PRODUCING.requestAll(...)调用requestFilter(to.postFilter())MAPPING.requestAll(...)调用requestFilter(to.defaultFilter())FLATMAPPING、REDUCING继续复用前者逻辑这样不同处理模式都会在
ALL场景下走同一套“完整汇聚组筛选”策略。filterReady(...)也被调整为先filterReadyByFanIn(...)再markReady(...),避免旧逻辑里“先标 ready、后分组”造成的状态不一致问题。7. To.java: 处理线程退出前的并发冲突补偿
修改说明
这一段解决的是处理线程退出窗口期的竞态问题。
原场景是:
处理线程判断当前没有 ready 数据,准备退出
就在退出前后,新数据恰好写入边上
外部又因为线程标记未及时变化,没有重新拉起处理
结果就是:新数据挂在边上,但没有线程再去消费。
handleProcessConcurrentConflict(...)的做法是:在线程准备退出时,再检查一次当前 pending
对这些 pending 再走一遍 ready 判定
只有当确实存在 ready 数据时,才重新触发
accept(ProcessType.PROCESS, pending)这个改动的重点在于“只对真正 ready 的数据重新拉起处理”,避免之前那种无条件自唤醒导致的空转或死循环。
🧪 验证变更 / Verifying this Change
测试步骤 / Test Steps
测试覆盖 / Test Coverage
📸 截图 / Screenshots
✅ 贡献者检查清单 / Contributor Checklist
请确保你的 Pull Request 符合以下要求 / Please ensure your Pull Request meets the following requirements:
基本要求 / Basic Requirements:
代码质量 / Code Quality:
测试要求 / Testing Requirements:
mvn -B clean package -Dmaven.test.skip=true,npm install --force && npm run build:pro/ Basic checks passmvn clean install/ Unit tests pass文档和兼容性 / Documentation and Compatibility:
📋 附加信息 / Additional Notes
审查者注意事项 / Reviewer Notes: