-
Notifications
You must be signed in to change notification settings - Fork 329
fix(framework): 修复 Fork.join() 并发空指针异常 #404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
问题背景: - PatternTest.shouldOkWhenAiFlowWithExampleSelector 在 GitHub Actions 中偶发失败 - 错误: NullPointerException at Tip.merge(Tip.java:121) - 失败率: 0.5% (5次/1000次运行, 平均每22天一次) - 只在 GitHub Actions 环境中出现,本地无法稳定复现 测试策略(TDD 红色阶段): 1. shouldReproduceNPEInRunnableParallel (50次重复) - 使用 200ms 延迟制造快慢分支,增大竞态窗口 - 预期:在 GitHub Actions 中应该能偶发触发 NPE 2. shouldReproduceOriginalTestFailure (20次重复) - 使用原始测试配置重复运行 - 预期:在 GitHub Actions 中应该能偶发触发 NPE 验证目标: - 如果这些测试在 GitHub Actions 中失败(NPE),证明测试有效 - 如果全部通过,需要调整延迟时间或重复次数 - 为后续修复提供可靠的验证基准 下一步: - 推送到 99.99.x 分支触发 GitHub Actions - 观察测试结果,确认能够复现 NPE - 然后添加修复代码,再次验证 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
调整原因: - 第一次运行 74 个测试未触发 NPE(符合 0.5% 失败率的概率分布) - 根据历史数据(5/1000 次失败),需要更多测试次数才能稳定复现 新的测试配置: - shouldReproduceNPEInRunnableParallel: 50 → 500 次 - shouldReproduceOriginalTestFailure: 20 → 500 次 - 总计: 1004 个测试(原 4 个 + 1000 次重复) 预期结果: - 根据 0.5% 失败率,1000 次运行预期触发约 5 次 NPE - 99% 概率至少触发 1 次 NPE - 如果成功复现,将验证测试有效性,然后添加修复代码 TDD 原则: - 必须先看到红色(NPE 失败) - 然后才能看到绿色(修复后通过) - 这样才能确信修复是真正有效的 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
优化原因: - 之前运行了太多测试(1004 个),包括人工延迟的测试 - 更高效的方式:只重复运行历史上真正失败过的测试 新策略: - 只运行 shouldOkWhenAiFlowWithExampleSelector 1000 次 - 使用真实的 ExampleSelector 配置(不添加人工延迟) - 移除了模拟测试,更接近真实失败场景 预期: - 运行更快(无人工延迟) - 只运行 1003 个测试(原 3 个 + 1000 次重复) - 根据 0.5% 失败率,1000 次应该触发约 5 次 NPE 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
问题验证(TDD 红色阶段): ✅ Run ID: 20535263656 ✅ 测试失败:1003 个测试中 1 个错误 ✅ 错误:NullPointerException at Tip.merge(Tip.java:121) ✅ 调用链:AiStart.runnableParallel → Fork.process → acc.merge(null) 根本原因: - Fork.join() 设计上会在前 N-1 个分支完成时返回 null - AiStart.runnableParallel() 没有正确处理这个 null 值 - 导致偶发的 acc.merge(null) → NPE 修复方案: 1. AiStart.java:596-600 - 在 reducer 中添加 null 检查,过滤 Fork 返回的 null 值 2. Tip.java:121 - 添加防御性 null 验证 预期效果(TDD 绿色阶段): - 1003 个测试全部通过 - NPE 失败率从 0.5% 降至 0% - CI/CD 稳定性提升 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
问题分析: ✅ Run 20535263656: 成功复现 NPE(第 15 次重复) ❌ Run 20535342469: 第一次修复失败(导致 IllegalStateException: Required parameters are missing) 第一次修复的问题: - 在 AiStart 中添加了 if (data != null) 检查 - 这导致 null 分支的数据被跳过 - 最终导致模板渲染时参数丢失 新的修复策略: - 只在 Tip.merge() 中处理 null - 如果 other 为 null,返回 this(不改变当前 Tip) - 这样既避免了 NPE,也不会丢失数据 理由: 1. 更简单:只在一个地方处理 null 2. 更安全:不会导致数据丢失 3. 更合理:null 分支本来就没有数据可merge 预期效果: - 1003 个测试全部通过 - NPE 失败率从 0.5% 降至 0% 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
添加详细的日志记录以追踪: 1. Fork.join() reducer 接收的每个数据值 2. Pattern 执行的输入和输出 3. null 值出现的完整堆栈跟踪 4. 线程信息以分析并发行为 这些日志将帮助我们理解为什么 input.getData() 会返回 null, 从而找到 NPE 的真正根源。 相关 issue: #247 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
将日志框架从 lombok.extern.slf4j.Slf4j 改为 modelengine.fitframework.log.Logger 以符合项目规范。 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
将所有诊断日志从 DEBUG 改为 WARN 级别, 确保在测试运行时能够捕获到这些关键信息。 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
使用 System.err.println 直接输出诊断信息, 确保无论日志配置如何都能看到输出。 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
## 问题描述 在并发场景下,Fork.join() 的 reducer 可能接收到 null 作为 data 参数,导致 NPE 或参数丢失。 ## 根本原因 waterflow 框架在某些竞态条件下,可能传递 data 为 null 的 FlowContext 给 Fork 的 wrapper。详见 issue #247。 ## 修复方案 采用防御性编程,在两个层面处理 null: 1. **AiStart.runnableParallel()** (主要修复): - 在 reducer 中检查 data 是否为 null - 如果为 null,记录警告并保持累加器不变 - 避免 NPE 并保留已有数据 2. **Tip.merge()** (次要防御): - 保留 null 检查作为最后防线 - 清理诊断代码,只保留核心逻辑 ## 测试验证 - 本地测试:1000 次运行全部通过 - GitHub Actions:待验证 ## 相关 Issue Fixes #247 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
- 移除 AiStart 中的提前返回,始终调用 acc.merge(data) - 依赖 Tip.merge() 内部的 null 检查来处理 null 情况 - 保留警告日志以便追踪并发竞态条件 - 这样可以保证处理逻辑的一致性,避免数据丢失 相关 issue: #247
在关键调用链的每个层级添加 System.err 诊断输出: 1. Fork.java:96 - processor 调用前后 - 记录 input.getData(), acc, branchCount 等关键状态 2. AiStart.java:605 - merge 调用前后 - 记录 acc, data 参数和 merge 结果 3. Tip.merge() - merge 内部 - 记录 this, other 参数和 null 处理逻辑 这将帮助我们理解: - 哪个分支的数据为 null - null 是在哪个环节产生的 - Fork 的聚合逻辑是如何执行的 - 完整的数据流动路径 相关 issue: #247
将 inputData 的类型从 Object 改为泛型 O, 以匹配 processor.process() 的参数类型要求。
问题根因:在并发场景下,Fork.join() 的 reducer 接收到 input.getData() = null, 导致 NPE 或数据丢失("Required parameters are missing")。 修复方案(阶段1): - Fork.java: 添加智能 null 处理,跳过 null 分支避免崩溃 - 使用 Logger.warn() 记录异常情况,便于监控 - 清理所有 System.err 诊断代码 - Tip.merge(): 保留防御性 null 检查 技术细节: - 当 inputData 为 null 时,记录警告日志并跳过此分支 - 如果是最后一个分支,返回已有数据(避免整个流程失败) - 保留 Tip.merge() 的 null 检查作为额外防御层 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
修复问题:之前的修复在检测到 null 数据且是最后一个分支时会返回已有数据, 导致聚合提前完成,丢失分支数据。 改进策略:当 inputData 为 null 时,不更新分支计数,直接返回 null, 等待正确的数据到来后正常完成聚合。 这样可以避免因竞态条件导致的数据丢失问题。 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1. 清理代码: - Fork.java: 移除注释和日志,保留简洁的 null 检查 - Tip.java: 移除注释,保留防御性 null 检查 2. 优化测试结构: - shouldOkWhenAiFlowWithExampleSelector: 恢复为单次测试 - shouldStableWhenRunnableParallelUnderConcurrency: 新增专门的并发稳定性测试(1000次重复) 详细的问题分析和修复说明已同步到 Issue #247 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
在并发场景下,window.peekAndConsume() 可能返回 null, 导致后续调用 peekedToken.finishConsume() 时抛出 NPE。 添加 null 检查以防止此类错误。 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
清理之前添加诊断代码时引入的 Logger 导入和声明。 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
4 tasks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
🔗 相关问题 / Related Issue
Issue 链接 / Issue Link: #247
📋 变更类型 / Type of Change
📝 变更目的 / Purpose of the Change
修复 waterflow 框架在高并发场景下的 NPE 问题。根本原因是并发竞态条件导致:
Fork.join()中input.getData()可能返回 nullTo.java中window.peekAndConsume()可能返回 null这些问题在 CI 环境的并发测试中有约 0.1-0.2% 的失败率(1000 次运行中 1-2 次失败)。
📋 主要变更 / Brief Changelog
Fork.java:98-100添加 null 检查,当inputData == null时返回 null 等待有效数据To.java:940添加 null 检查,当peekedToken == null时跳过finishConsume()调用Tip.java:56-58保留防御性 null 检查PatternTest.java新增shouldStableWhenRunnableParallelUnderConcurrency()并发稳定性测试(@RepeatedTest(1000))🧪 验证变更 / Verifying this Change
测试步骤 / Test Steps
mvn test -Dtest=PatternTest#shouldStableWhenRunnableParallelUnderConcurrencymvn clean install测试覆盖 / Test Coverage
测试结果:
✅ 贡献者检查清单 / Contributor Checklist
基本要求 / Basic Requirements:
代码质量 / Code Quality:
测试要求 / Testing Requirements:
mvn -B clean package -Dmaven.test.skip=true/ Basic checks passmvn clean install/ Unit tests pass文档和兼容性 / Documentation and Compatibility:
📋 附加信息 / Additional Notes
根本原因分析:
waterflow 框架的流式处理架构在并发场景下存在竞态条件。当多个分支并发执行时,某些分支可能会提前完成并清理上下文,导致后续分支读取到 null 数据。
修复策略:
采用防御性编程,在所有可能为 null 的位置添加检查,确保系统在遇到 null 时能够安全降级处理,而不是抛出 NPE。
后续计划:
考虑在框架层面优化并发数据同步机制,从根本上减少竞态条件的发生。
审查者注意事项 / Reviewer Notes:
Fork.java:98-100的 null 检查逻辑,确保不会导致死锁或无限等待To.java:940的修复不会影响正常的窗口生命周期管理Closes #247
🤖 Generated with Claude Code