Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
8c010cb
test: 添加 NPE 复现测试用于 GitHub Actions 验证
CodeCasterX Dec 27, 2025
7ece734
test: 增加 NPE 复现测试重复次数到 1000 次
CodeCasterX Dec 27, 2025
af500e0
test: 简化 NPE 复现策略 - 只重复运行原始失败测试
CodeCasterX Dec 27, 2025
df6fcc6
fix: 修复 runnableParallel 中的 NPE 竞态条件
CodeCasterX Dec 27, 2025
a7c00b8
fix: 修复 Tip.merge() 的 null 处理逻辑
CodeCasterX Dec 27, 2025
0376a2b
feat: 添加诊断日志以定位 Fork.join() NPE 根本原因
CodeCasterX Dec 27, 2025
4abac1a
fix: 使用项目标准 Logger 替代 Lombok @Slf4j
CodeCasterX Dec 27, 2025
46b035f
fix: 将诊断日志级别提升为 WARN 以确保输出
CodeCasterX Dec 27, 2025
395a827
debug: 在 Tip.merge() 中添加 System.err 诊断输出
CodeCasterX Dec 27, 2025
fb083c5
fix: 防御性处理 Fork.join 并发竞态导致的 null 数据
CodeCasterX Dec 27, 2025
6fa817e
fix: 改进 null 数据处理逻辑,始终调用 Tip.merge() 以避免逻辑不一致
CodeCasterX Dec 27, 2025
753020c
chore: 添加完整的诊断输出以追踪 NPE 根本原因
CodeCasterX Dec 28, 2025
a8d9b99
fix: 修复 Fork.java 的类型转换编译错误
CodeCasterX Dec 28, 2025
150abf1
fix: 修复 Fork.join() 并发场景下的 null 数据处理 (#247)
CodeCasterX Jan 12, 2026
4a8a1b2
fix: 改进 Fork.join() null 数据处理逻辑
CodeCasterX Jan 12, 2026
00337d9
refactor: 清理代码并优化测试结构
CodeCasterX Jan 12, 2026
108872a
fix: 修复 To.java 中 peekedToken 可能为 null 的问题
CodeCasterX Jan 12, 2026
db235c9
chore: 移除 AiStart.java 中未使用的 Logger
CodeCasterX Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ public Tip addAll(Map<String, Content> args) {
* @return 表示当前的 {@link Tip}。
*/
public Tip merge(Tip other) {
if (other == null) {
return this;
}
return this.addAll(other.values);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,10 +592,7 @@ public final AiState<Tip, D, Tip, RF, F> runnableParallel(Pattern<O, Tip>... pat
.orElseGet(() -> new AiParallel<>(this.start.parallel(), mineFlow).fork(branchProcessor));
}

AiState<Tip, D, Tip, RF, F> state = aiFork.join(Tip::new, (acc, data) -> {
acc.merge(data);
return acc;
});
AiState<Tip, D, Tip, RF, F> state = aiFork.join(Tip::new, (acc, data) -> acc.merge(data));
((Processor<?, ?>) state.publisher()).displayAs("runnableParallel");
return state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import modelengine.fitframework.util.StringUtils;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import java.util.Collection;
Expand Down Expand Up @@ -97,6 +98,23 @@ void shouldOkWhenAiFlowWithExampleSelector() {
assertThat(converse.offer("1+2").await().text()).isEqualTo("2+2=4\n2+3=5\n1+2=");
}

@RepeatedTest(1000)
@DisplayName("测试 RunnableParallel 并发稳定性")
void shouldStableWhenRunnableParallelUnderConcurrency() {
Example[] examples = {new DefaultExample("2+2", "4"), new DefaultExample("2+3", "5")};
Conversation<String, Prompt> converse = AiFlows.<String>create()
.runnableParallel(question(),
fewShot(ExampleSelector.builder()
.template("{{q}}={{a}}", "q", "a")
.delimiter("\n")
.example(examples)
.build()))
.prompt(Prompts.human("{{examples}}\n{{question}}="))
.close()
.converse();
assertThat(converse.offer("1+2").await().text()).isEqualTo("2+2=4\n2+3=5\n1+2=");
}

@Test
@DisplayName("测试 Retriever")
void shouldOkWhenAiFlowWithRetriever() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,13 @@ public synchronized R process(FlowContext<O> input) {
acc = Tuple.from((R) "", 0);
}
}
acc = Tuple.from(processor.process(acc.first(), input.getData()), acc.second() + 1);

O inputData = input.getData();
if (inputData == null) {
return null;
}
R processedResult = processor.process(acc.first(), inputData);
acc = Tuple.from(processedResult, acc.second() + 1);
accs.put(key, acc);

if (acc.second() == forkNumber.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,9 @@ public <T1, R1> List<FlowContext<R1>> process(To<T1, R1> to, List<FlowContext<T1
nextSession.getWindow().complete();
}
} else {
peekedToken.finishConsume();
if (peekedToken != null) {
peekedToken.finishConsume();
}
if (window.isDone()) {
window.tryFinish();
}
Expand Down