Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -329,30 +329,27 @@ public <V> Task<V> callActivity(
}

// Add router information for cross-app routing
// Router always has a source app ID from EXECUTIONSTARTED event
OrchestratorService.TaskRouter.Builder routerBuilder = OrchestratorService.TaskRouter.newBuilder()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code. Removed it.

.setSourceAppID(this.appId);

// Add target app ID if specified in options
if (options != null && options.hasAppID()) {
String targetAppId = options.getAppID();
OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.setTargetAppID(targetAppId)
.build();
scheduleTaskBuilder.setRouter(router);
this.logger.fine(() -> String.format(
"cross app routing detected: source=%s, target=%s",
this.appId, targetAppId));
if (this.appId != null && !this.appId.isEmpty()) {
// Add target app ID if specified in options
if (options != null && options.hasAppID()) {
String targetAppId = options.getAppID();
OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.setTargetAppID(targetAppId)
.build();
scheduleTaskBuilder.setRouter(router);
this.logger.fine(() -> String.format(
"cross app routing detected: source=%s, target=%s",
this.appId, targetAppId));
}
}
TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;
OrchestratorService.ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build();
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setScheduleTask(scheduleTaskBuilder);
if (options != null && options.hasAppID()) {
if (this.appId != null && !this.appId.isEmpty() && options != null && options.hasAppID()) {
String targetAppId = options.getAppID();
OrchestratorService.TaskRouter actionRouter = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
Expand Down Expand Up @@ -499,13 +496,40 @@ public <V> Task<V> callSubOrchestrator(
}
createSubOrchestrationActionBuilder.setInstanceId(instanceId);

// TODO: @cicoyle - add suborchestration cross app logic here when its supported
// Add router information for cross-app routing of sub-orchestrations
if (this.appId != null && !this.appId.isEmpty()) {
OrchestratorService.TaskRouter.Builder routerBuilder = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId);

// Add target app ID if specified in options
if (options != null && options.hasAppID()) {
routerBuilder.setTargetAppID(options.getAppID());
this.logger.fine(() -> String.format(
"cross app sub-orchestration routing detected: source=%s, target=%s",
this.appId, options.getAppID()));
}

createSubOrchestrationActionBuilder.setRouter(routerBuilder.build());
}

TaskFactory<V> taskFactory = () -> {
int id = this.sequenceNumber++;
this.pendingActions.put(id, OrchestratorService.OrchestratorAction.newBuilder()
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setCreateSubOrchestration(createSubOrchestrationActionBuilder)
.build());
.setCreateSubOrchestration(createSubOrchestrationActionBuilder);

// Set router on the OrchestratorAction for cross-app routing
if (this.appId != null && !this.appId.isEmpty()) {
OrchestratorService.TaskRouter.Builder actionRouterBuilder = OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId);
if (options != null && options.hasAppID()) {
actionRouterBuilder.setTargetAppID(options.getAppID());
}
actionBuilder.setRouter(actionRouterBuilder.build());
}

this.pendingActions.put(id, actionBuilder.build());

if (!this.isReplaying) {
this.logger.fine(() -> String.format(
Expand Down Expand Up @@ -941,11 +965,20 @@ private void completeInternal(
}

int id = this.sequenceNumber++;
OrchestratorService.OrchestratorAction action = OrchestratorService.OrchestratorAction.newBuilder()
OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction
.newBuilder()
.setId(id)
.setCompleteOrchestration(builder.build())
.build();
this.pendingActions.put(id, action);
.setCompleteOrchestration(builder.build());

// Add router to completion action for cross-app routing back to parent
if (this.appId != null && !this.appId.isEmpty()) {
actionBuilder.setRouter(
OrchestratorService.TaskRouter.newBuilder()
.setSourceAppID(this.appId)
.build());
}

this.pendingActions.put(id, actionBuilder.build());
this.isComplete = true;
}

Expand Down Expand Up @@ -1009,7 +1042,16 @@ private void processEvent(OrchestratorService.HistoryEvent e) {
this.setInput(executionStarted.getInput().getValue());
this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId());
this.logger.fine(() -> this.instanceId + ": Workflow execution started");
this.setAppId(e.getRouter().getSourceAppID());
// For cross-app suborchestrations, if the router has a target, use that as our appID
// since that's where we're actually executing
if (e.hasRouter()) {
OrchestratorService.TaskRouter router = e.getRouter();
if (router.hasTargetAppID()) {
this.setAppId(router.getTargetAppID());
} else {
this.setAppId(router.getSourceAppID());
}
}

var versionName = "";
if (!StringUtils.isEmpty(this.orchestratorVersionName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,138 @@ void subOrchestration() throws TimeoutException {
}
}

@Test
void subOrchestrationWithActivity() throws TimeoutException {
final String parentOrchestratorName = "ParentOrchestrator";
final String childOrchestratorName = "ChildOrchestrator";
final String activityName = "PlusOne";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int childResult = ctx.callSubOrchestrator(childOrchestratorName, input, int.class).await();
ctx.complete(childResult);
})
.addOrchestrator(childOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> ctx.getInput(int.class) + 1)
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, 10);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals(11, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationChain() throws TimeoutException {
final String orchestratorName = "ChainOrchestrator";
final String leafOrchestratorName = "LeafOrchestrator";
final String activityName = "Double";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
int input = ctx.getInput(int.class);
// Chain: parent calls child which calls leaf
int result = ctx.callSubOrchestrator(leafOrchestratorName, input, int.class).await();
// Call activity after sub-orchestration completes
result = ctx.callActivity(activityName, result, int.class).await();
ctx.complete(result);
})
.addOrchestrator(leafOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> ctx.getInput(int.class) * 2)
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
// input=3 -> leaf doubles to 6 -> parent doubles to 12
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 3);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals(12, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationFanOut() throws TimeoutException {
final String parentOrchestratorName = "FanOutParent";
final String childOrchestratorName = "FanOutChild";
final String activityName = "Square";
final int childCount = 5;

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
// Fan out: launch multiple sub-orchestrations in parallel
List<Task<Integer>> tasks = IntStream.range(1, childCount + 1)
.mapToObj(i -> ctx.callSubOrchestrator(childOrchestratorName, i, int.class))
.collect(Collectors.toList());

List<Integer> results = ctx.allOf(tasks).await();
int sum = results.stream().mapToInt(Integer::intValue).sum();
ctx.complete(sum);
})
.addOrchestrator(childOrchestratorName, ctx -> {
int input = ctx.getInput(int.class);
int result = ctx.callActivity(activityName, input, int.class).await();
ctx.complete(result);
})
.addActivity(activityName, ctx -> {
int val = ctx.getInput(int.class);
return val * val;
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, 0);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
// 1^2 + 2^2 + 3^2 + 4^2 + 5^2 = 1 + 4 + 9 + 16 + 25 = 55
assertEquals(55, instance.readOutputAs(int.class));
}
}

@Test
void subOrchestrationWithInstanceId() throws TimeoutException {
final String parentOrchestratorName = "ParentWithInstanceId";
final String childOrchestratorName = "ChildWithInstanceId";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(parentOrchestratorName, ctx -> {
String childInstanceId = ctx.getInstanceId() + ":child";
String result = ctx.callSubOrchestrator(
childOrchestratorName, "hello", childInstanceId, String.class).await();
ctx.complete(result);
})
.addOrchestrator(childOrchestratorName, ctx -> {
String input = ctx.getInput(String.class);
ctx.complete(input + " world");
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, "test");
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
assertEquals("hello world", instance.readOutputAs(String.class));
}
}

@Test
void continueAsNew() throws TimeoutException {
final String orchestratorName = "continueAsNew";
Expand Down
Loading
Loading