Skip to content

Conversation

@CodeCasterX
Copy link
Member

@CodeCasterX CodeCasterX commented Jan 13, 2026

🔗 相关问题 / Related Issue

Issue 链接 / Issue Link: #247

  • 我已经创建了相关 Issue 并进行了讨论 / I have created and discussed the related issue

📋 变更类型 / Type of Change

  • 🐛 Bug 修复 / Bug fix (non-breaking change which fixes an issue)

📝 变更目的 / Purpose of the Change

修复 waterflow 框架在高并发场景下的 NPE 问题。根本原因是并发竞态条件导致:

  1. Fork.join()input.getData() 可能返回 null
  2. To.javawindow.peekAndConsume() 可能返回 null

这些问题在 CI 环境的并发测试中有约 0.1-0.2% 的失败率(1000 次运行中 1-2 次失败)。

📋 主要变更 / Brief Changelog

  • Fork.java:98-100 添加 null 检查,当 inputData == null 时返回 null 等待有效数据
  • To.java:940 添加 null 检查,当 peekedToken == null 时跳过 finishConsume() 调用
  • Tip.java:56-58 保留防御性 null 检查
  • PatternTest.java 新增 shouldStableWhenRunnableParallelUnderConcurrency() 并发稳定性测试(@RepeatedTest(1000))
  • 清理所有诊断代码和冗余注释

🧪 验证变更 / Verifying this Change

测试步骤 / Test Steps

  1. 运行并发测试:mvn test -Dtest=PatternTest#shouldStableWhenRunnableParallelUnderConcurrency
  2. 运行完整测试套件:mvn clean install
  3. 验证 GitHub Actions CI 通过

测试覆盖 / Test Coverage

  • 我已经添加了单元测试 / I have added unit tests
  • 所有现有测试都通过 / All existing tests pass
  • 我已经进行了手动测试 / I have performed manual testing

测试结果:

  • 本地 1000 次并发测试通过(0 failures)
  • GitHub Actions Run #20921425198: 1004 tests, 0 failures, 0 errors
  • 所有现有功能测试保持通过

✅ 贡献者检查清单 / Contributor Checklist

基本要求 / Basic Requirements:

  • 确保有 GitHub Issue 对应这个变更(微小变更如错别字除外)/ Make sure there is a Github issue filed for the change (trivial changes like typos excluded)
  • 你的 Pull Request 只解决一个 Issue,没有包含其他不相关的变更 / Your PR addresses just this issue, without pulling in other changes - one PR resolves one issue
  • PR 中的每个 commit 都有有意义的主题行和描述 / Each commit in the PR has a meaningful subject line and body

代码质量 / Code Quality:

  • 我的代码遵循项目的代码规范 / My code follows the project's coding standards
  • 我已经进行了自我代码审查 / I have performed a self-review of my code
  • 我已经为复杂的代码添加了必要的注释 / I have commented my code, particularly in hard-to-understand areas

测试要求 / Testing Requirements:

  • 我已经编写了必要的单元测试来验证逻辑正确性 / I have written necessary unit-tests to verify the logic correction
  • 当存在跨模块依赖时,我尽量使用了 mock / I have used mocks when cross-module dependencies exist
  • 基础检查通过:mvn -B clean package -Dmaven.test.skip=true / Basic checks pass
  • 单元测试通过:mvn clean install / Unit tests pass

文档和兼容性 / Documentation and Compatibility:

  • 我已经更新了相应的文档 / I have made corresponding changes to the documentation
  • 如果有破坏性变更,我已经在 PR 描述中详细说明 / If there are breaking changes, I have documented them in detail
  • 我已经考虑了向后兼容性 / I have considered backward compatibility

📋 附加信息 / Additional Notes

根本原因分析:

waterflow 框架的流式处理架构在并发场景下存在竞态条件。当多个分支并发执行时,某些分支可能会提前完成并清理上下文,导致后续分支读取到 null 数据。

修复策略:

采用防御性编程,在所有可能为 null 的位置添加检查,确保系统在遇到 null 时能够安全降级处理,而不是抛出 NPE。

后续计划:

考虑在框架层面优化并发数据同步机制,从根本上减少竞态条件的发生。


审查者注意事项 / Reviewer Notes:

  1. 重点关注 Fork.java:98-100 的 null 检查逻辑,确保不会导致死锁或无限等待
  2. 验证 To.java:940 的修复不会影响正常的窗口生命周期管理
  3. 确认新增的并发测试能够有效复现原问题

Closes #247

🤖 Generated with Claude Code

CodeCasterX and others added 18 commits January 12, 2026 20:12
问题背景:
- PatternTest.shouldOkWhenAiFlowWithExampleSelector 在 GitHub Actions 中偶发失败
- 错误: NullPointerException at Tip.merge(Tip.java:121)
- 失败率: 0.5% (5次/1000次运行, 平均每22天一次)
- 只在 GitHub Actions 环境中出现,本地无法稳定复现

测试策略(TDD 红色阶段):
1. shouldReproduceNPEInRunnableParallel (50次重复)
   - 使用 200ms 延迟制造快慢分支,增大竞态窗口
   - 预期:在 GitHub Actions 中应该能偶发触发 NPE

2. shouldReproduceOriginalTestFailure (20次重复)
   - 使用原始测试配置重复运行
   - 预期:在 GitHub Actions 中应该能偶发触发 NPE

验证目标:
- 如果这些测试在 GitHub Actions 中失败(NPE),证明测试有效
- 如果全部通过,需要调整延迟时间或重复次数
- 为后续修复提供可靠的验证基准

下一步:
- 推送到 99.99.x 分支触发 GitHub Actions
- 观察测试结果,确认能够复现 NPE
- 然后添加修复代码,再次验证

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
调整原因:
- 第一次运行 74 个测试未触发 NPE(符合 0.5% 失败率的概率分布)
- 根据历史数据(5/1000 次失败),需要更多测试次数才能稳定复现

新的测试配置:
- shouldReproduceNPEInRunnableParallel: 50 → 500 次
- shouldReproduceOriginalTestFailure: 20 → 500 次
- 总计: 1004 个测试(原 4 个 + 1000 次重复)

预期结果:
- 根据 0.5% 失败率,1000 次运行预期触发约 5 次 NPE
- 99% 概率至少触发 1 次 NPE
- 如果成功复现,将验证测试有效性,然后添加修复代码

TDD 原则:
- 必须先看到红色(NPE 失败)
- 然后才能看到绿色(修复后通过)
- 这样才能确信修复是真正有效的

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
优化原因:
- 之前运行了太多测试(1004 个),包括人工延迟的测试
- 更高效的方式:只重复运行历史上真正失败过的测试

新策略:
- 只运行 shouldOkWhenAiFlowWithExampleSelector 1000 次
- 使用真实的 ExampleSelector 配置(不添加人工延迟)
- 移除了模拟测试,更接近真实失败场景

预期:
- 运行更快(无人工延迟)
- 只运行 1003 个测试(原 3 个 + 1000 次重复)
- 根据 0.5% 失败率,1000 次应该触发约 5 次 NPE

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
问题验证(TDD 红色阶段):
✅ Run ID: 20535263656
✅ 测试失败:1003 个测试中 1 个错误
✅ 错误:NullPointerException at Tip.merge(Tip.java:121)
✅ 调用链:AiStart.runnableParallel → Fork.process → acc.merge(null)

根本原因:
- Fork.join() 设计上会在前 N-1 个分支完成时返回 null
- AiStart.runnableParallel() 没有正确处理这个 null 值
- 导致偶发的 acc.merge(null) → NPE

修复方案:
1. AiStart.java:596-600 - 在 reducer 中添加 null 检查,过滤 Fork 返回的 null 值
2. Tip.java:121 - 添加防御性 null 验证

预期效果(TDD 绿色阶段):
- 1003 个测试全部通过
- NPE 失败率从 0.5% 降至 0%
- CI/CD 稳定性提升

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
问题分析:
✅ Run 20535263656: 成功复现 NPE(第 15 次重复)
❌ Run 20535342469: 第一次修复失败(导致 IllegalStateException: Required parameters are missing)

第一次修复的问题:
- 在 AiStart 中添加了 if (data != null) 检查
- 这导致 null 分支的数据被跳过
- 最终导致模板渲染时参数丢失

新的修复策略:
- 只在 Tip.merge() 中处理 null
- 如果 other 为 null,返回 this(不改变当前 Tip)
- 这样既避免了 NPE,也不会丢失数据

理由:
1. 更简单:只在一个地方处理 null
2. 更安全:不会导致数据丢失
3. 更合理:null 分支本来就没有数据可merge

预期效果:
- 1003 个测试全部通过
- NPE 失败率从 0.5% 降至 0%

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
添加详细的日志记录以追踪:
1. Fork.join() reducer 接收的每个数据值
2. Pattern 执行的输入和输出
3. null 值出现的完整堆栈跟踪
4. 线程信息以分析并发行为

这些日志将帮助我们理解为什么 input.getData() 会返回 null,
从而找到 NPE 的真正根源。

相关 issue: #247

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
将日志框架从 lombok.extern.slf4j.Slf4j 改为
modelengine.fitframework.log.Logger 以符合项目规范。

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
将所有诊断日志从 DEBUG 改为 WARN 级别,
确保在测试运行时能够捕获到这些关键信息。

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
使用 System.err.println 直接输出诊断信息,
确保无论日志配置如何都能看到输出。

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
## 问题描述
在并发场景下,Fork.join() 的 reducer 可能接收到 null 作为
data 参数,导致 NPE 或参数丢失。

## 根本原因
waterflow 框架在某些竞态条件下,可能传递 data 为 null 的
FlowContext 给 Fork 的 wrapper。详见 issue #247。

## 修复方案
采用防御性编程,在两个层面处理 null:

1. **AiStart.runnableParallel()** (主要修复):
   - 在 reducer 中检查 data 是否为 null
   - 如果为 null,记录警告并保持累加器不变
   - 避免 NPE 并保留已有数据

2. **Tip.merge()** (次要防御):
   - 保留 null 检查作为最后防线
   - 清理诊断代码,只保留核心逻辑

## 测试验证
- 本地测试:1000 次运行全部通过
- GitHub Actions:待验证

## 相关 Issue
Fixes #247

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
- 移除 AiStart 中的提前返回,始终调用 acc.merge(data)
- 依赖 Tip.merge() 内部的 null 检查来处理 null 情况
- 保留警告日志以便追踪并发竞态条件
- 这样可以保证处理逻辑的一致性,避免数据丢失

相关 issue: #247
在关键调用链的每个层级添加 System.err 诊断输出:

1. Fork.java:96 - processor 调用前后
   - 记录 input.getData(), acc, branchCount 等关键状态

2. AiStart.java:605 - merge 调用前后
   - 记录 acc, data 参数和 merge 结果

3. Tip.merge() - merge 内部
   - 记录 this, other 参数和 null 处理逻辑

这将帮助我们理解:
- 哪个分支的数据为 null
- null 是在哪个环节产生的
- Fork 的聚合逻辑是如何执行的
- 完整的数据流动路径

相关 issue: #247
将 inputData 的类型从 Object 改为泛型 O,
以匹配 processor.process() 的参数类型要求。
问题根因:在并发场景下,Fork.join() 的 reducer 接收到 input.getData() = null,
导致 NPE 或数据丢失("Required parameters are missing")。

修复方案(阶段1):
- Fork.java: 添加智能 null 处理,跳过 null 分支避免崩溃
- 使用 Logger.warn() 记录异常情况,便于监控
- 清理所有 System.err 诊断代码
- Tip.merge(): 保留防御性 null 检查

技术细节:
- 当 inputData 为 null 时,记录警告日志并跳过此分支
- 如果是最后一个分支,返回已有数据(避免整个流程失败)
- 保留 Tip.merge() 的 null 检查作为额外防御层

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
修复问题:之前的修复在检测到 null 数据且是最后一个分支时会返回已有数据,
导致聚合提前完成,丢失分支数据。

改进策略:当 inputData 为 null 时,不更新分支计数,直接返回 null,
等待正确的数据到来后正常完成聚合。

这样可以避免因竞态条件导致的数据丢失问题。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1. 清理代码:
   - Fork.java: 移除注释和日志,保留简洁的 null 检查
   - Tip.java: 移除注释,保留防御性 null 检查

2. 优化测试结构:
   - shouldOkWhenAiFlowWithExampleSelector: 恢复为单次测试
   - shouldStableWhenRunnableParallelUnderConcurrency: 新增专门的并发稳定性测试(1000次重复)

详细的问题分析和修复说明已同步到 Issue #247

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
在并发场景下,window.peekAndConsume() 可能返回 null,
导致后续调用 peekedToken.finishConsume() 时抛出 NPE。

添加 null 检查以防止此类错误。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
清理之前添加诊断代码时引入的 Logger 导入和声明。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@CodeCasterX CodeCasterX self-assigned this Jan 13, 2026
@CodeCasterX CodeCasterX linked an issue Jan 13, 2026 that may be closed by this pull request
4 tasks
@CodeCasterX CodeCasterX added this to the 3.6.3 milestone Jan 13, 2026
@CodeCasterX CodeCasterX added type: bug A general bug in: fel Issues in FEL(FIT Expression for LLM) modules labels Jan 13, 2026
@CodeCasterX CodeCasterX merged commit e49f1db into 3.6.x Jan 13, 2026
2 checks passed
@CodeCasterX CodeCasterX deleted the 99.99.x branch January 13, 2026 02:36
@CodeCasterX CodeCasterX changed the title fix: 修复 Fork.join() 并发 NPE 问题 fix(framework): 修复 Fork.join() 并发空指针异常 Jan 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

in: fel Issues in FEL(FIT Expression for LLM) modules type: bug A general bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix(fel): 修复 Action 执行时的偶发报错

2 participants