-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Nexus CancelCommand in CHASM #9288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5b636df
b7ee27c
4a60898
1a142c4
a4ead56
3366370
e7f6712
8adfd60
f2c7d57
a7528b1
e085cea
8beb81c
3e79c24
64087d4
6e9792d
5697882
6ce2cd9
57a1fbe
48d314e
dc2c7d7
fab4cf0
864ad11
f34d2e5
181ddc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,23 @@ | ||
| package nexusoperation | ||
|
|
||
| import ( | ||
| "go.temporal.io/api/serviceerror" | ||
| "go.temporal.io/server/chasm" | ||
| nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" | ||
| ) | ||
|
|
||
| var _ chasm.Component = (*Operation)(nil) | ||
| var _ chasm.StateMachine[nexusoperationpb.OperationStatus] = (*Operation)(nil) | ||
|
|
||
| // ErrCancellationAlreadyRequested is returned when a cancellation has already been requested for an operation. | ||
| var ErrCancellationAlreadyRequested = serviceerror.NewFailedPrecondition("cancellation already requested") | ||
|
|
||
| // ErrOperationAlreadyCompleted is returned when trying to cancel an operation that has already completed. | ||
| var ErrOperationAlreadyCompleted = serviceerror.NewFailedPrecondition("operation already completed") | ||
|
|
||
| type OperationStore any | ||
|
|
||
| // Operation is a CHASM component that represents a Nexus operation. | ||
| type Operation struct { | ||
| chasm.UnimplementedComponent | ||
|
|
||
|
|
@@ -19,12 +27,17 @@ type Operation struct { | |
| // Pointer to an implementation of the "store". For a workflow-based Nexus operation | ||
| // this is a parent pointer back to the workflow. For a standalone Nexus operation this is nil. | ||
| Store chasm.ParentPtr[OperationStore] | ||
|
|
||
| // Cancellation is a child component that manages sending the cancel request to the Nexus endpoint. | ||
| Cancellation chasm.Field[*Cancellation] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing doc string. |
||
| } | ||
|
|
||
| // NewOperation creates a new Operation component with the given persisted state. | ||
| func NewOperation(state *nexusoperationpb.OperationState) *Operation { | ||
| return &Operation{OperationState: state} | ||
| } | ||
|
|
||
| // LifecycleState maps the operation's status to a CHASM lifecycle state. | ||
| func (o *Operation) LifecycleState(_ chasm.Context) chasm.LifecycleState { | ||
| switch o.Status { | ||
| case nexusoperationpb.OPERATION_STATUS_SUCCEEDED: | ||
|
|
@@ -38,10 +51,36 @@ func (o *Operation) LifecycleState(_ chasm.Context) chasm.LifecycleState { | |
| } | ||
| } | ||
|
|
||
| // StateMachineState returns the current operation status. | ||
| func (o *Operation) StateMachineState() nexusoperationpb.OperationStatus { | ||
| return o.Status | ||
| } | ||
|
|
||
| // SetStateMachineState sets the operation status. | ||
| func (o *Operation) SetStateMachineState(status nexusoperationpb.OperationStatus) { | ||
| o.Status = status | ||
| } | ||
|
|
||
| // Cancel requests cancellation of the operation. It creates a Cancellation child component and, if the | ||
| // operation has already started, schedules the cancellation request to be sent to the Nexus endpoint. | ||
| func (o *Operation) Cancel(ctx chasm.MutableContext, requestedEventID int64) error { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please document all exported members. |
||
| if !TransitionCanceled.Possible(o) { | ||
| return ErrOperationAlreadyCompleted | ||
| } | ||
| if _, ok := o.Cancellation.TryGet(ctx); ok { | ||
| return ErrCancellationAlreadyRequested | ||
| } | ||
|
|
||
| cancellation := newCancellation(&nexusoperationpb.CancellationState{ | ||
| RequestedEventId: requestedEventID, | ||
| }) | ||
| o.Cancellation = chasm.NewComponentField(ctx, cancellation) | ||
|
|
||
| // Once started, the handler returns a token that can be used in the cancelation request. | ||
| // Until then, no need to schedule the cancelation. | ||
| if o.Status == nexusoperationpb.OPERATION_STATUS_STARTED { | ||
|
stephanos marked this conversation as resolved.
|
||
| return transitionCancellationScheduled.Apply(cancellation, ctx, EventCancellationScheduled{}) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,7 +46,7 @@ func registerCommandHandlers( | |
| } | ||
| return registry.Register( | ||
| enumspb.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION, | ||
| handleCancelCommand, | ||
| h.handleCancelCommand, | ||
| ) | ||
| } | ||
|
|
||
|
|
@@ -68,6 +68,10 @@ func (ch *commandHandler) handleScheduleCommand( | |
| } | ||
| } | ||
|
|
||
| if !ch.config.ChasmNexusEnabled(nsName) { | ||
| return command.ErrNotSupported | ||
| } | ||
|
|
||
| attrs := cmd.GetScheduleNexusOperationCommandAttributes() | ||
| if attrs == nil { | ||
| return command.FailWorkflowTaskError{ | ||
|
|
@@ -286,13 +290,89 @@ func (ch *commandHandler) handleScheduleCommand( | |
| return nil | ||
| } | ||
|
|
||
| func handleCancelCommand( | ||
| func (ch *commandHandler) handleCancelCommand( | ||
| chasmCtx chasm.MutableContext, | ||
| wf *chasmworkflow.Workflow, | ||
| validator command.Validator, | ||
| cmd *commandpb.Command, | ||
| opts command.HandlerOptions, | ||
| ) error { | ||
| // TODO: Implement CHASM nexus operation cancellation | ||
| return serviceerror.NewUnimplemented("CHASM nexus operation cancellation not yet implemented") | ||
| if !ch.config.Enabled() { | ||
| return command.FailWorkflowTaskError{ | ||
| Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_FEATURE_DISABLED, | ||
| Message: "Nexus operations disabled", | ||
| } | ||
| } | ||
|
|
||
| nsName := chasmCtx.NamespaceEntry().Name().String() | ||
| if !ch.config.ChasmNexusEnabled(nsName) { | ||
| return command.ErrNotSupported | ||
| } | ||
|
|
||
| attrs := cmd.GetRequestCancelNexusOperationCommandAttributes() | ||
| if attrs == nil { | ||
| return command.FailWorkflowTaskError{ | ||
| Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES, | ||
| Message: "empty CancelNexusOperationCommandAttributes", | ||
| } | ||
| } | ||
|
|
||
| key := strconv.FormatInt(attrs.ScheduledEventId, 10) | ||
| operationField, operationFound := wf.Operations[key] | ||
| hasBufferedEvent := func() bool { | ||
| return wf.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) | ||
| } | ||
|
|
||
| if !operationFound && !hasBufferedEvent() { | ||
| return command.FailWorkflowTaskError{ | ||
| Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES, | ||
| Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId), | ||
| } | ||
| } | ||
|
|
||
| // Always create the event even if there's a buffered completion to avoid breaking replay in the SDK. | ||
| // The event will be applied before the completion since buffered events are reordered and put at the end of the | ||
| // batch, after command events from the workflow task. | ||
| event := wf.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, func(he *historypb.HistoryEvent) { | ||
| he.Attributes = &historypb.HistoryEvent_NexusOperationCancelRequestedEventAttributes{ | ||
| NexusOperationCancelRequestedEventAttributes: &historypb.NexusOperationCancelRequestedEventAttributes{ | ||
| ScheduledEventId: attrs.ScheduledEventId, | ||
| WorkflowTaskCompletedEventId: opts.WorkflowTaskCompletedEventID, | ||
| }, | ||
| } | ||
| he.UserMetadata = cmd.UserMetadata | ||
| }) | ||
|
|
||
| if !operationFound { | ||
| // Operation not found but there's a buffered terminal event. The workflow couldn't know | ||
| // the operation completed while its task was in flight. Ignore. | ||
| return nil | ||
| } | ||
|
|
||
| op := operationField.Get(chasmCtx) | ||
| err := op.Cancel(chasmCtx, event.GetEventId()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self. Ideally all of the event ID tracking should be done in a location that is agnostic to where the component is embedded. Figure out a way to keep track of this in a way that the operation logic isn't directly affected. |
||
| if errors.Is(err, nexusoperation.ErrCancellationAlreadyRequested) { | ||
| return command.FailWorkflowTaskError{ | ||
| Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES, | ||
| Message: fmt.Sprintf("cancelation was already requested for an operation with scheduled event ID %d", attrs.ScheduledEventId), | ||
| } | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| func makeNexusOperationTerminalEventFilter(scheduledEventID int64) func(event *historypb.HistoryEvent) bool { | ||
| return func(event *historypb.HistoryEvent) bool { | ||
| switch event.EventType { | ||
| case enumspb.EVENT_TYPE_NEXUS_OPERATION_COMPLETED: | ||
| return event.GetNexusOperationCompletedEventAttributes().GetScheduledEventId() == scheduledEventID | ||
| case enumspb.EVENT_TYPE_NEXUS_OPERATION_FAILED: | ||
| return event.GetNexusOperationFailedEventAttributes().GetScheduledEventId() == scheduledEventID | ||
| case enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCELED: | ||
| return event.GetNexusOperationCanceledEventAttributes().GetScheduledEventId() == scheduledEventID | ||
| case enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT: | ||
| return event.GetNexusOperationTimedOutEventAttributes().GetScheduledEventId() == scheduledEventID | ||
| default: | ||
| return false | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to namespace scope to match
ChasmEnabled.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call!