Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CompleteContext extends FlowContext {
*/
public CompleteContext(FlowContext context, String position) {
super(context.getStreamId(), context.getRootId(), null, context.getTraceId(), position,
context.getParallel(), context.getParallelMode(), context.getSession());
context.getParallel(), context.getParallelMode(), context.getSession(), context.getCreateAt());
this.batchId = context.getBatchId();
this.setIndex(Constants.NOT_PRESERVED_INDEX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ public class FlowContext<T> extends IdGenerator implements StateContext {
@Setter
private Integer index;

/**
* 当前context接下来要走到位置:可以是连线或者节点id
*/
@Setter
@Getter
private String nextPositionId;

/**
* 创建一个 {@link FlowContext} 实例。
*
Expand All @@ -154,7 +161,7 @@ public class FlowContext<T> extends IdGenerator implements StateContext {
*/
public FlowContext(String streamId, String rootId, T data, Set<String> traceId, String position,
FlowSession session) {
this(streamId, rootId, data, traceId, position, "", "", session);
this(streamId, rootId, data, traceId, position, "", "", session, LocalDateTime.now());
}

/**
Expand All @@ -167,10 +174,11 @@ public FlowContext(String streamId, String rootId, T data, Set<String> traceId,
* @param position 表示上下文当前所处的位置的 {@link String}。
* @param parallel 表示并行节点唯一标识的 {@link String}。
* @param parallelMode 表示并行模式的 {@link String}。
* @param createAt 表示创建时间的 {@link LocalDateTime}。
* @param session 表示上下文会话信息的 {@link FlowSession}。
*/
public FlowContext(String streamId, String rootId, T data, Set<String> traceId, String position, String parallel,
String parallelMode, FlowSession session) {
String parallelMode, FlowSession session, LocalDateTime createAt) {
this.streamId = streamId;
this.rootId = rootId;
this.data = data;
Expand All @@ -179,7 +187,7 @@ public FlowContext(String streamId, String rootId, T data, Set<String> traceId,
this.position = position;
this.parallel = parallel;
this.parallelMode = parallelMode;
this.createAt = LocalDateTime.now();
this.createAt = createAt;
this.session = session;
this.index = this.createIndex(); // 0起始,说明保序
}
Expand Down Expand Up @@ -266,17 +274,19 @@ public FlowContext<T> toBatch(String toBatchId) {
* @param <R> 表示返回值类型的泛型参数。
* @param data 表示处理后数据的 {@link R}。
* @param position 表示处理后所处的节点的 {@link String}。
* @param createAt 表示创建时间的 {@link LocalDateTime}。
* @return 表示新的上下文的 {@link FlowContext}{@code <}{@link R}{@code >}。
*/
public <R> FlowContext<R> generate(R data, String position) {
public <R> FlowContext<R> generate(R data, String position, LocalDateTime createAt) {
FlowContext<R> context = new FlowContext<>(this.streamId,
this.rootId,
data,
this.traceId,
this.position,
this.parallel,
this.parallelMode,
this.session);
this.session,
createAt);
context.position = position;
context.previous = this.id;
context.batchId = this.batchId;
Expand All @@ -293,7 +303,9 @@ public <R> FlowContext<R> generate(R data, String position) {
* @return 表示新的上下文的 {@link List}{@code <}{@link FlowContext}{@code <}{@link R}{@code >}{@code >}。
*/
public <R> List<FlowContext<R>> generate(List<R> dataList, String position) {
return dataList.stream().map(data -> this.generate(data, position)).collect(Collectors.toList());
return dataList.stream()
.map(data -> this.generate(data, position, LocalDateTime.now()))
.collect(Collectors.toList());
}

/**
Expand All @@ -312,7 +324,8 @@ public <R> FlowContext<R> convertData(R data, String id) {
this.position,
this.parallel,
this.parallelMode,
this.session);
this.session,
LocalDateTime.now());
context.previous = this.previous;
context.status = this.status;
context.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class FlowSession extends IdGenerator implements StateContext {
* @param preserved 是否保序
*/
public FlowSession(boolean preserved) {
this(UUID.randomUUID().toString(), preserved);
this.preserved = preserved;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fit.waterflow.domain.context;

import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;

/**
* 提供trace的归属服务
*
* @author 夏斐
* @since 2024/4/29
*/
public interface TraceOwner {
/**
* own
*
* @param traceId traceId
* @param transId transId
*/
void own(String traceId, String transId);

/**
* tryOwn
*
* @param traceId traceId
* @param transId transId
* @return boolean
*/
boolean tryOwn(String traceId, String transId);

/**
* release
*
* @param traceId traceId
*/
void release(String traceId);

/**
* isOwn
*
* @param traceId traceId
* @return boolean
*/
boolean isOwn(String traceId);

/**
* trace map中包含任意一个trace列表的值,返回true
*
* @param traceIds trace id列表
* @return true or false
*/
boolean isAnyOwn(Set<String> traceIds);

/**
* 获取链路标识列表。
*
* @return 链路标识列表。
*/
List<String> getTraces();

/**
* 获取链路标识列表。
*
* @param targetTransId 实例标识。
* @return 链路标识列表。
*/
List<String> getTraces(String targetTransId);

/**
* 移除所有失效的链路标识。
*
* @param invalidLock 失效的链路标识锁。
*/
void removeInvalidTrace(Lock invalidLock);

/**
* 判断trace是否在初始化保护期
* 针对首次offer trace先加入到内存,但是实际数据库中还未插入时的情况使用
*
* @param traceId traceId
* @return true-处于保护时间,false-超过保护时间
*/
boolean isInProtectTime(String traceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private void fire() {
cs.add(completeContext);
List contexts = node.getProcessMode().process(node, cs);
if (node instanceof Processor) {
((Processor<?, ?>) node).offer(contexts);
((Processor<?, ?>) node).offer(contexts, __ -> {});
}
completeContext = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public <I> void send(String nodeId, List<FlowContext<I>> contexts) {
* @param <I> 流程实例执行时的入参数据类型,用于泛型推倒
*/
@Override
public <I> void sendCallback(List<FlowContext<I>> contexts) {
public <I> void sendCallback(Object callback, List<FlowContext<I>> contexts) {
LOG.warn("FlowEngine memo messenger does not support sending events.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@

import modelengine.fit.waterflow.domain.context.FlowContext;
import modelengine.fit.waterflow.domain.context.FlowTrace;
import modelengine.fit.waterflow.domain.context.TraceOwner;
import modelengine.fit.waterflow.domain.enums.FlowNodeStatus;
import modelengine.fit.waterflow.domain.stream.operators.Operators;
import modelengine.fitframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -33,6 +37,55 @@ public class FlowContextMemoRepo implements FlowContextRepo {

private final boolean isReserveTerminal;

private final TraceOwner traceOwner = new TraceOwner() {
@Override
public void own(String traceId, String transId) {
}

@Override
public boolean tryOwn(String traceId, String transId) {
return true;
}

@Override
public void release(String traceId) {
}

@Override
public boolean isOwn(String traceId) {
return true;
}

@Override
public boolean isAnyOwn(Set<String> traceIds) {
return true;
}

@Override
public List<String> getTraces() {
return List.of();
}

@Override
public List<String> getTraces(String targetTransId) {
return List.of();
}

@Override
public void removeInvalidTrace(Lock invalidLock) {
}

@Override
public boolean isInProtectTime(String traceId) {
return false;
}
};

@Override
public TraceOwner getTraceOwner() {
return this.traceOwner;
}

/**
* 构造方法
*/
Expand Down Expand Up @@ -74,6 +127,11 @@ public <T> List<FlowContext<T>> getContextsByPosition(String streamId, String po
.filter(context -> context.getStatus().toString().equals(status)));
}

@Override
public <T> List<FlowContext<T>> findWithoutFlowDataByTraceId(String traceId) {
throw new IllegalStateException("Not support");
}

@Override
public <T> List<FlowContext<T>> getContextsByTrace(String traceId) {
return query(stream -> stream
Expand All @@ -95,6 +153,21 @@ public synchronized <T> void save(List<FlowContext<T>> contexts) {
});
}

@Override
public <T> void updateFlowDataAndToBatch(List<FlowContext<T>> contexts) {
save(contexts);
}

@Override
public synchronized <T> void updateFlowData(Map<String, T> flowDataList) {
flowDataList.forEach((contextId, data) -> {
FlowContext<T> flowContext = ObjectUtils.cast(this.contextsMap.get(contextId));
if (flowContext != null) {
flowContext.setData(data);
}
});
}

@Override
public <T> void updateToSent(List<FlowContext<T>> contexts) {
save(contexts);
Expand Down Expand Up @@ -125,6 +198,14 @@ public <T> List<FlowContext<T>> getByIds(List<String> ids) {
return ids.stream().map(i -> (FlowContext<T>) contextsMap.get(i)).collect(Collectors.toList());
}

@Override
public <T> List<FlowContext<T>> getByToBatch(List<String> toBatchIds) {
return query(stream -> stream
.filter(context -> context.getStatus().equals(FlowNodeStatus.PENDING))
.filter(FlowContext::isSent)
.filter(context -> toBatchIds.contains(context.getToBatch())));
}

@Override
public <T> List<FlowContext<T>> requestMappingContext(String streamId, List<String> subscriptions,
Map<String, Integer> sessions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ default <I> void send(ProcessType type, Subscriber<I, ?> subscriber, List<FlowCo
/**
* 发送回调函数事件到引擎外部
*
* @param callback 回调函数.
* @param contexts 流程实例执行过程产生的contexts
* @param <I> 流程实例执行时的入参数据类型,用于泛型推倒
*/
<I> void sendCallback(List<FlowContext<I>> contexts);
<I> void sendCallback(Object callback, List<FlowContext<I>> contexts);

/**
* Directly processes a list of flow contexts through the specified subscriber.
Expand Down
Loading