Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent-flow/src/components/base/jadeNode.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ export const jadeNode = (id, x, y, width, height, parent, drawer) => {
* @returns {number} 连接数。
*/
self.maxNumToLink = () => {
return 1;
return self.graph?.connectionLimitDisabled ? 100 : 1;
};

/**
Expand Down
4 changes: 3 additions & 1 deletion agent-flow/src/components/base/validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export class NormalNodeConnectorValidator extends Validator {
validate() {
const nextEvents = this.node.getNextRunnableEvents();
const i18n = this.node.graph.i18n;
if (nextEvents.length !== 1) {
const isConnectionLimitDisabled = Boolean(this.node.graph?.connectionLimitDisabled);
const isValid = isConnectionLimitDisabled ? nextEvents.length >= 1 : nextEvents.length === 1;
if (!isValid) {
return Promise.reject({
errorFields: [{
errors: [`${i18n?.t('node') ?? 'node'} ${this.node.text} ${i18n?.t('problemWithConnection') ?? 'problemWithConnection'}`],
Expand Down
2 changes: 1 addition & 1 deletion agent-flow/src/components/code/codeNodeState.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export const codeNodeState = (id, x, y, width, height, parent, drawer) => {
* @override
*/
self.maxNumToLink = () => {
return 10;
return self.graph?.connectionLimitDisabled ? 100 : 10;
};

return self;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const variableAggregationNodeState = (id, x, y, width, height, parent, dr
* @override
*/
self.maxNumToLink = () => {
return 10;
return self.graph?.connectionLimitDisabled ? 100 : 10;
};

return self;
Expand Down
10 changes: 8 additions & 2 deletions agent-flow/src/flow/jadeFlowEntry.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ const jadeFlowAgent = (graph) => {
graph.destroy();
};

self.setConnectionLimitDisabled = (disabled) => {
graph.connectionLimitDisabled = Boolean(disabled);
};

return self;
};

Expand Down Expand Up @@ -432,6 +436,7 @@ export const JadeFlow = (() => {
div,
tenant,
appId,
connectionLimitDisabled = false,
flowConfigData,
configs,
i18n,
Expand All @@ -440,7 +445,7 @@ export const JadeFlow = (() => {
}) => {
const graphDom = getGraphDom(div);
const g = jadeFlowGraph(div, 'jadeFlow');
await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements);
await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled);
g.flowType = flowType;
const pageData = g.getPageData(0);
await g.editFlow(0, graphDom, pageData.id);
Expand Down Expand Up @@ -470,8 +475,9 @@ export const JadeFlow = (() => {
return jadeFlowAgent(g);
};

const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements) => {
const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled = false) => {
g.collaboration.mute = true;
g.connectionLimitDisabled = Boolean(connectionLimitDisabled);
g.configs = configs;
g.i18n = i18n;
for (let i = 0; i < importStatements.length; i++) {
Expand Down
1 change: 1 addition & 0 deletions agent-flow/src/flow/jadeFlowGraph.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export const jadeFlowGraph = (div, title) => {
const self = defaultGraph(div, title);
self.type = 'jadeFlowGraph';
self.pageType = 'jadeFlowPage';
self.connectionLimitDisabled = false;
self.enableText = false;
self.flowMeta = {
exceptionFitables: ['modelengine.fit.jober.aipp.fitable.AippFlowExceptionHandler'],
Expand Down
8 changes: 6 additions & 2 deletions agent-flow/src/flow/jadeFlowPage.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export const jadeFlowPage = (div, graph, name, id) => {
self.addEventListener('COPY_SHAPE', shapeChangeListener);
self.addEventListener('DELETE_SHAPE', shapeChangeListener);

const isConnectionLimitDisabled = () => Boolean(self.graph?.connectionLimitDisabled);

/**
* @override
*/
Expand Down Expand Up @@ -305,7 +307,7 @@ export const jadeFlowPage = (div, graph, name, id) => {
*/
self.canDragOut = (node, connector) => {
const lines = self.getEvents().filter(s => s.fromShape === node.id && s.getDefinedFromConnector() === connector);
return lines && lines.length < 1;
return lines.length < (isConnectionLimitDisabled() ? 10 : 1);
};

/**
Expand All @@ -330,7 +332,9 @@ export const jadeFlowPage = (div, graph, name, id) => {
}
};

return jadeEvent.fromShape !== jadeEvent.toShape && isConnectorAllowToLink() && isConnectorWithinLimit();
return jadeEvent.fromShape !== jadeEvent.toShape
&& isConnectorAllowToLink()
&& (isConnectionLimitDisabled() || isConnectorWithinLimit());
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public static RunContext from(CreateAppChatRequest request, OperationContext con
.orElseGet(() -> CreateAppChatRequest.Context.builder().build());
RunContext runContext = new RunContext(new HashMap<>(), context);
runContext.putAllToBusiness(requestContext.getUserContext());

// runContext请求操作
runContext.setUseMemory(requestContext.getUseMemory());
runContext.setDimension(requestContext.getDimension());
runContext.setChatId(request.getChatId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public final class FlowContext<T> extends IdGenerator {
/**
* 通过from.offer(data)而不是.offer(context)发起的数据会新增一个trace,这个trace会延续到flow end
*/

@Getter
private final Set<String> traceId;

Expand Down Expand Up @@ -265,6 +266,29 @@ public <R> List<FlowContext<R>> generate(List<R> data, String position) {
return data.stream().map(d -> this.generate(d, position, LocalDateTime.now())).collect(Collectors.toList());
}

/**
* fork一个新的context用于一拖多分支,继承当前context的运行元数据,但生成新的contextId。
*
* @return 新的分支context
*/
public FlowContext<T> fork() {
return this.convertData(this.data);
}

/**
* convertData
*
* @param <R> 转换后的数据类型
* @param data 转换后的数据
* @return 转换后的context
*/
public <R> FlowContext<R> convertData(R data) {
FlowContext<R> context = this.copyContext(data);
context.previous = this.id;
context.nextPositionId = this.nextPositionId;
return context;
}

/**
* 用于when.convert数据时候的转换context,除了包裹的数据类型不一样,所有其他信息都一样
*
Expand All @@ -274,12 +298,17 @@ public <R> List<FlowContext<R>> generate(List<R> data, String position) {
* @return 转换后的context
*/
public <R> FlowContext<R> convertData(R data, String id) {
FlowContext<R> context = this.copyContext(data);
context.previous = this.previous;
context.id = id;
return context;
}

private <R> FlowContext<R> copyContext(R data) {
FlowContext<R> context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position,
this.parallel, this.parallelMode, LocalDateTime.now());
context.previous = this.previous;
context.status = this.status;
context.trans = this.trans;
context.id = id;
context.batchId = this.batchId;
context.toBatch = this.toBatch;
context.createAt = this.createAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ default void update(List<FlowContext<T>> contexts) {
}

/**
* updateToSent
* 更新context状态为SENT
*
* @param contexts contexts
* @param contexts 上下文列表
*/
void updateToSent(List<FlowContext<T>> contexts);

/**
* updateToReady
* 更新context状态为READY
*
* @param contexts contexts
* @param contexts 上下文列表
*/
void updateToReady(List<FlowContext<T>> contexts);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,14 @@ public FitStream.Publisher<FlowData> convertToFlow(FlowContextRepo<FlowData> rep
nodeMap.values().forEach((fromNode) -> {
fromNode.setParentFlow(this);
Optional.ofNullable(fromNode.getJober()).ifPresent(jober -> jober.setContextRepo(repo));

fromNode.getEvents().forEach(event -> {
// startNode不能出现在event的to属性, endNode不能出现在event的from属性

FlowNode toNode = nodeMap.get(event.getTo());

fromNode.subscribe(streamId, flowEnv, toNode, event);

});
});
return getFlowNode(FlowNodeType.START).getPublisher(streamId, repo, messenger, locks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class FlowStateNode extends FlowNode {
*/
@Override
public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, FlowContextRepo<FlowData> repo,
FlowContextMessenger messenger, FlowLocks locks) {
FlowContextMessenger messenger, FlowLocks locks) {
if (!Optional.ofNullable(this.processor).isPresent()) {
Node<FlowData, FlowData> node = new Node<>(streamId, this.metaId, this::stateProduce, repo, messenger,
locks, this.type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -322,26 +323,64 @@ public void offer(List<FlowContext<I>> contexts, Consumer<PreSendCallbackInfo<I>
// qualifiedWhens表示的与from节点连接的所有事件,条件节点符合条件的事件在这里筛选,在事件上处理需要下发的context
java.util.Map<FitStream.Subscription<I, ?>, List<FlowContext<I>>> matchedContexts = new LinkedHashMap<>();
Set<FlowContext<I>> matchedContextSet = new HashSet<>();
qualifiedWhens.forEach(
w -> {
List<FlowContext<I>> afterContexts = contexts
.stream()
.filter(c -> w.getWhether().is(c))
.peek(c -> c.setNextPositionId(w.getId()))
.collect(Collectors.toList());
matchedContexts.put(w, afterContexts);
matchedContextSet.addAll(afterContexts);
List<FlowContext<I>> forkedContexts = new ArrayList<>();
for (FlowContext<I> contextItem : contexts) {
List<FitStream.Subscription<I, ?>> matchedSubscriptions = qualifiedWhens.stream()
.filter(w -> w.getWhether().is(contextItem))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(matchedSubscriptions)) {
continue;
}
matchedContextSet.add(contextItem);
for (int index = 0; index < matchedSubscriptions.size(); index++) {
FitStream.Subscription<I, ?> matchedSubscription = matchedSubscriptions.get(index);
FlowContext<I> branchContext = index == 0 ? contextItem : contextItem.fork();
branchContext.setNextPositionId(matchedSubscription.getId());
matchedContexts.computeIfAbsent(matchedSubscription, key -> new ArrayList<>()).add(branchContext);
if (index > 0) {
forkedContexts.add(branchContext);
}
);
}
}
qualifiedWhens.forEach(w -> matchedContexts.computeIfAbsent(w, key -> new ArrayList<>()));
List<FlowContext<I>> unMatchedContexts = contexts
.stream()
.filter(c -> !matchedContextSet.contains(c))
.collect(Collectors.toList());
PreSendCallbackInfo<I> callbackInfo = new PreSendCallbackInfo<>(matchedContexts, unMatchedContexts);
preSendCallback.accept(callbackInfo);
persistForkedContexts(forkedContexts, matchedContexts);
matchedContexts.forEach(FitStream.Subscription::cache);
}

private void persistForkedContexts(List<FlowContext<I>> forkedContexts,
java.util.Map<FitStream.Subscription<I, ?>, List<FlowContext<I>>> matchedContexts) {
if (CollectionUtils.isEmpty(forkedContexts)) {
return;
}
Set<String> forkedIds = forkedContexts.stream().map(FlowContext::getId).collect(Collectors.toSet());
List<FlowContext<I>> effectiveForkedContexts = matchedContexts.values()
.stream()
.flatMap(List::stream)
.filter(context -> forkedIds.contains(context.getId()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(effectiveForkedContexts)) {
return;
}
Set<String> traces = effectiveForkedContexts.stream()
.flatMap(context -> context.getTraceId().stream())
.collect(Collectors.toSet());
Lock lock = this.locks.getDistributedLock(this.locks.streamNodeLockKey(this.streamId, this.id,
"ForkContextPool"));
lock.lock();
try {
this.repo.updateContextPool(effectiveForkedContexts, traces);
this.repo.save(effectiveForkedContexts);
} finally {
lock.unlock();
}
}

/**
* 是否有publisher目标
* 用于stream闭环时将没有subscribed的publisher关闭到close subscriber
Expand Down
Loading