Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ You can use the following two techniques to write workflows that may need to sch
1. **Use the _continue-as-new_ API**:
Each workflow SDK exposes a _continue-as-new_ API that workflows can invoke to restart themselves with a new input and history. The _continue-as-new_ API is especially ideal for implementing "eternal workflows", like monitoring agents, which would otherwise be implemented using a `while (true)`-like construct. Using _continue-as-new_ is a great way to keep the workflow history size small.

{{% alert title="Note" color="primary" %}}
Continue-as-new restarts the workflow immediately and discards the results of any incomplete tasks - including activities, timers, and child workflows that were started but not awaited.
{{% /alert %}}

> The _continue-as-new_ API truncates the existing history, replacing it with a new history.

1. **Use child workflows**:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,25 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):

def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
# Add step-specific logic here (e.g. data transformation)
return activity_input + 1


def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
# Add step-specific logic here (e.g. data transformation)
return activity_input * 2


def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
# Add step-specific logic here (e.g. data transformation)
return activity_input ^ 2


def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Apply some compensating work
# Add logic here to undo partial changes (e.g., release inventory)
```

> **Note** Workflow retry policies will be available in a future version of the Python SDK.
Expand Down Expand Up @@ -139,7 +139,7 @@ async function start() {
start().catch((e) => {
console.error(e);
process.exit(1);
# Apply custom compensation logic
// Add logic here to undo partial changes (e.g., refund payment)
});
```

Expand Down Expand Up @@ -201,7 +201,7 @@ public class ChainWorkflow extends Workflow {
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(Step1.class);
logger.info("Starting Activity: " + ctx.getName());
// Do some work
// Add step-specific logic here (e.g. data transformation)
return null;
}
}
Expand All @@ -212,7 +212,7 @@ public class ChainWorkflow extends Workflow {
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(Step2.class);
logger.info("Starting Activity: " + ctx.getName());
// Do some work
// Add step-specific logic here (e.g. data transformation)
return null;
}
}
Expand All @@ -223,7 +223,7 @@ public class ChainWorkflow extends Workflow {
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(Step3.class);
logger.info("Starting Activity: " + ctx.getName());
// Do some work
// Add step-specific logic here (e.g. data transformation)
return null;
}
}
Expand Down Expand Up @@ -739,6 +739,11 @@ Depending on the business needs, there may be a single monitor or there may be m

Dapr Workflow supports this pattern natively by allowing you to implement _eternal workflows_. Rather than writing infinite while-loops ([which is an anti-pattern]({{% ref "workflow-features-concepts.md#infinite-loops-and-eternal-workflows" %}})), Dapr Workflow exposes a _continue-as-new_ API that workflow authors can use to restart a workflow function from the beginning with a new input.

{{% alert title="Note" color="primary" %}}
_Continue-as-new_ proceeds immediately without waiting for child workflows that were started but not awaited. This allows the monitor pattern to continue its loop while child workflows are still running.
{{% /alert %}}


{{< tabpane text=true >}}

{{% tab "Python" %}}
Expand Down Expand Up @@ -795,12 +800,10 @@ const statusMonitorWorkflow: TWorkflow = async function* (ctx: WorkflowContext):
const status = yield ctx.callActivity(checkStatusActivity);
if (status === "healthy") {
// Check less frequently when in a healthy state
// set duration to 1 hour
duration = 60 * 60;
} else {
yield ctx.callActivity(alertActivity, "job unhealthy");
// Check more frequently when in an unhealthy state
// set duration to 5 minutes
duration = 5 * 60;
}

Expand Down Expand Up @@ -1099,7 +1102,7 @@ async function start() {
// Orders of $1000 or more require manager approval
yield ctx.callActivity(sendApprovalRequest, order);

// Approvals must be received within 24 hours or they will be cancled.
// Approvals must be received within 24 hours or they will be cancelled.
const tasks: Task<any>[] = [];
const approvalEvent = ctx.waitForExternalEvent("approval_received");
const timeoutEvent = ctx.createTimer(24 * 60 * 60);
Expand Down Expand Up @@ -1179,7 +1182,7 @@ start().catch((e) => {
```csharp
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
// ...(other steps)...
// Initial business logic (e.g. inventory check)

// Require orders over a certain threshold to be approved
if (order.TotalCost > OrderApprovalThreshold)
Expand All @@ -1206,7 +1209,7 @@ public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderP
}
}

// ...(other steps)...
// Final business logic (e.g. shipping confirmation)

// End the workflow with a success result
return new OrderResult(Processed: true);
Expand All @@ -1224,7 +1227,7 @@ public class ExternalSystemInteractionWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
// ...other steps...
// Initial business logic (e.g. inventory check)
Integer orderCost = ctx.getInput(int.class);
// Require orders over a certain threshold to be approved
if (orderCost > ORDER_APPROVAL_THRESHOLD) {
Expand All @@ -1242,7 +1245,7 @@ public class ExternalSystemInteractionWorkflow extends Workflow {
ctx.complete("Process cancel");
}
}
// ...other steps...
// Final business logic (e.g. shipping confirmation)

// End the workflow with a success result
ctx.complete("Process approved");
Expand Down
Loading