Skip to content

Commit 15b3218

Browse files
committed
Add versioning to sub orchestrations
Signed-off-by: Hal Spang <halspang@microsoft.com>
1 parent 6863dcc commit 15b3218

6 files changed

Lines changed: 153 additions & 4 deletions

File tree

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public void startAndBlock() {
117117
this.orchestrationFactories,
118118
this.dataConverter,
119119
this.maximumTimerInterval,
120-
logger);
120+
logger,
121+
this.versioningOptions);
121122
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
122123
this.activityFactories,
123124
this.dataConverter,
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.microsoft.durabletask;
2+
3+
/**
4+
* Options for starting a new sub-orchestration instance.
5+
*/
6+
public class NewSubOrchestrationInstanceOptions extends TaskOptions {
7+
8+
private String instanceId;
9+
private String version;
10+
11+
/**
12+
* Creates options with a retry policy for the sub-orchestration.
13+
* @param retryPolicy The retry policy to use for the sub-orchestration.
14+
*/
15+
public NewSubOrchestrationInstanceOptions(RetryPolicy retryPolicy) {
16+
super(retryPolicy);
17+
}
18+
19+
/**
20+
* Creates options with a retry handler for the sub-orchestration.
21+
* @param retryHandler The retry handler to use for the sub-orchestration.
22+
*/
23+
public NewSubOrchestrationInstanceOptions(RetryHandler retryHandler) {
24+
super(retryHandler);
25+
}
26+
27+
/**
28+
* Sets the version for the sub-orchestration instance.
29+
* @param version The version string to use.
30+
* @return This options object for chaining.
31+
*/
32+
public NewSubOrchestrationInstanceOptions setVersion(String version) {
33+
this.version = version;
34+
return this;
35+
}
36+
37+
/**
38+
* Gets the version for the sub-orchestration instance.
39+
* @return The version string, or null if not set.
40+
*/
41+
public String getVersion() {
42+
return version;
43+
}
44+
45+
/**
46+
* Sets a custom instance ID for the sub-orchestration.
47+
* @param instanceId The custom instance ID to use.
48+
* @return This options object for chaining.
49+
*/
50+
public NewSubOrchestrationInstanceOptions setInstanceId(String instanceId) {
51+
this.instanceId = instanceId;
52+
return this;
53+
}
54+
55+
/**
56+
* Gets the custom instance ID for the sub-orchestration.
57+
* @return The instance ID, or null if not set.
58+
*/
59+
public String getInstanceId() {
60+
return instanceId;
61+
}
62+
}

client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ public TaskOrchestration create() {
129129
orchestrationFactories,
130130
new JacksonDataConverter(),
131131
DEFAULT_MAXIMUM_TIMER_INTERVAL,
132-
logger);
132+
logger,
133+
null);
133134

134135
// TODO: Error handling
135136
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(

client/src/main/java/com/microsoft/durabletask/TaskOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
/**
66
* Options that can be used to control the behavior of orchestrator and activity task execution.
77
*/
8-
public final class TaskOptions {
8+
public class TaskOptions {
99
private final RetryPolicy retryPolicy;
1010
private final RetryHandler retryHandler;
1111

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,29 @@ final class TaskOrchestrationExecutor {
3030
private final DataConverter dataConverter;
3131
private final Logger logger;
3232
private final Duration maximumTimerInterval;
33+
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
3334

3435
public TaskOrchestrationExecutor(
3536
HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
3637
DataConverter dataConverter,
3738
Duration maximumTimerInterval,
38-
Logger logger) {
39+
Logger logger,
40+
DurableTaskGrpcWorkerVersioningOptions versioningOptions) {
3941
this.orchestrationFactories = orchestrationFactories;
4042
this.dataConverter = dataConverter;
4143
this.maximumTimerInterval = maximumTimerInterval;
4244
this.logger = logger;
45+
this.versioningOptions = versioningOptions;
4346
}
4447

4548
public TaskOrchestratorResult execute(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
4649
ContextImplTask context = new ContextImplTask(pastEvents, newEvents);
4750

51+
if (this.versioningOptions != null && this.versioningOptions.getDefaultVersion() != null) {
52+
// Set the default version for the orchestrator
53+
context.setDefaultVersion(this.versioningOptions.getDefaultVersion());
54+
}
55+
4856
boolean completed = false;
4957
try {
5058
// Play through the history events until either we've played through everything
@@ -82,6 +90,7 @@ private class ContextImplTask implements TaskOrchestrationContext {
8290
private boolean isReplaying = true;
8391
private int newUUIDCounter;
8492
private String version;
93+
private String defaultVersion;
8594

8695
// LinkedHashMap to maintain insertion order when returning the list of pending actions
8796
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
@@ -182,6 +191,15 @@ private void setVersion(String version) {
182191
this.version = version;
183192
}
184193

194+
private String getDefaultVersion() {
195+
return this.defaultVersion;
196+
}
197+
198+
private void setDefaultVersion(String defaultVersion) {
199+
// This is used when starting sub-orchestrations
200+
this.defaultVersion = defaultVersion;
201+
}
202+
185203
public <V> Task<V> completedTask(V value) {
186204
CompletableTask<V> task = new CompletableTask<>();
187205
task.complete(value);
@@ -389,6 +407,16 @@ public <V> Task<V> callSubOrchestrator(
389407
}
390408
createSubOrchestrationActionBuilder.setInstanceId(instanceId);
391409

410+
if (options instanceof NewSubOrchestrationInstanceOptions && ((NewSubOrchestrationInstanceOptions)options).getVersion() != null) {
411+
NewSubOrchestrationInstanceOptions subOrchestrationOptions = (NewSubOrchestrationInstanceOptions) options;
412+
if (subOrchestrationOptions.getVersion() != null) {
413+
createSubOrchestrationActionBuilder.setVersion(StringValue.of(subOrchestrationOptions.getVersion()));
414+
}
415+
} else if (this.getDefaultVersion() != null) {
416+
// If the options are not of the correct type, we still allow the version to be set
417+
createSubOrchestrationActionBuilder.setVersion(StringValue.of(this.getDefaultVersion()));
418+
}
419+
392420
TaskFactory<V> taskFactory = () -> {
393421
int id = this.sequenceNumber++;
394422
this.pendingActions.put(id, OrchestratorAction.newBuilder()

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1742,4 +1742,61 @@ void orchestrationVersionStrictMatchStrategy(String orchestrationVersion) {
17421742
throw new RuntimeException(e);
17431743
}
17441744
}
1745+
1746+
@ParameterizedTest
1747+
@ValueSource(strings = {"null", "", "0.9"})
1748+
public void versionsAppliedToSubOrchestration(String subOrchestrationVersion) {
1749+
final String orchestratorName = "VersionOrchestration";
1750+
final String subOrchestratorName = "subOrchestration";
1751+
final String activityName = "SayVersion";
1752+
final String defaultVersion = "1.0";
1753+
final DurableTaskGrpcWorkerVersioningOptions versioningOptions = new DurableTaskGrpcWorkerVersioningOptions(
1754+
"1.0", // Version of the worker.
1755+
defaultVersion, // Default version, used to assign versions to suborchestrations created by the worker.
1756+
DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.CURRENTOROLDER,
1757+
DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy.FAIL);
1758+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
1759+
.addOrchestrator(orchestratorName, ctx -> {
1760+
final NewSubOrchestrationInstanceOptions options = new NewSubOrchestrationInstanceOptions((RetryPolicy) null);
1761+
if (!subOrchestrationVersion.equals("null")) {
1762+
options.setVersion(subOrchestrationVersion);
1763+
}
1764+
String output = ctx.callSubOrchestrator(subOrchestratorName, null, null, options, String.class).await();
1765+
ctx.complete(output);
1766+
})
1767+
.addOrchestrator(subOrchestratorName, ctx -> {
1768+
String version = ctx.getVersion();
1769+
String output = ctx.callActivity(activityName, version, String.class).await();
1770+
ctx.complete(output);
1771+
})
1772+
.addActivity(activityName, ctx -> {
1773+
return String.format("Version: %s", ctx.getInput(String.class));
1774+
})
1775+
.useVersioning(versioningOptions)
1776+
.buildAndStart();
1777+
1778+
DurableTaskClient client = this.createClientBuilder()
1779+
.defaultVersion(defaultVersion)
1780+
.build();
1781+
try (worker; client) {
1782+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
1783+
OrchestrationMetadata instance = client.waitForInstanceCompletion(
1784+
instanceId,
1785+
defaultTimeout,
1786+
true);
1787+
1788+
assertNotNull(instance);
1789+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
1790+
String output = instance.readOutputAs(String.class);
1791+
String expected = "";
1792+
if (subOrchestrationVersion.equals("null")) {
1793+
expected = String.format("Version: %s", defaultVersion);
1794+
} else {
1795+
expected = String.format("Version: %s", subOrchestrationVersion);
1796+
}
1797+
assertEquals(expected, output);
1798+
} catch (TimeoutException e) {
1799+
throw new RuntimeException(e);
1800+
}
1801+
}
17451802
}

0 commit comments

Comments
 (0)