Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions chasm/lib/nexusoperation/cancellation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ import (
var _ chasm.Component = (*Cancellation)(nil)
var _ chasm.StateMachine[nexusoperationpb.CancellationStatus] = (*Cancellation)(nil)

// Cancellation is a CHASM component that represents a pending cancellation of a Nexus operation.
type Cancellation struct {
chasm.UnimplementedComponent

// Persisted internal state
*nexusoperationpb.CancellationState
}

func NewCancellation() *Cancellation {
return &Cancellation{}
func newCancellation(state *nexusoperationpb.CancellationState) *Cancellation {
return &Cancellation{CancellationState: state}
}

// LifecycleState maps the cancellation's status to a CHASM lifecycle state.
func (o *Cancellation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
switch o.Status {
case nexusoperationpb.CANCELLATION_STATUS_SUCCEEDED:
Expand All @@ -31,10 +33,12 @@ func (o *Cancellation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
}
}

// StateMachineState returns the current cancellation status.
func (o *Cancellation) StateMachineState() nexusoperationpb.CancellationStatus {
return o.Status
}

// SetStateMachineState sets the cancellation status.
func (o *Cancellation) SetStateMachineState(status nexusoperationpb.CancellationStatus) {
o.Status = status
}
4 changes: 2 additions & 2 deletions chasm/lib/nexusoperation/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.temporal.io/server/common/rpc/interceptor"
)

var ChasmNexusEnabled = dynamicconfig.NewGlobalBoolSetting(
var ChasmNexusEnabled = dynamicconfig.NewNamespaceBoolSetting(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to namespace scope to match ChasmEnabled.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

"nexusoperation.enableChasm",
false,
`Feature flag that controls whether the legacy HSM-based implementation (when flag is false; default) or the newer
Expand Down Expand Up @@ -151,7 +151,7 @@ Added for safety. Defaults to true. Likely to be removed in future server versio
type Config struct {
Enabled dynamicconfig.BoolPropertyFn
ChasmEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
ChasmNexusEnabled dynamicconfig.BoolPropertyFn
ChasmNexusEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down
39 changes: 39 additions & 0 deletions chasm/lib/nexusoperation/operation.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package nexusoperation

import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/chasm"
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
)

var _ chasm.Component = (*Operation)(nil)
var _ chasm.StateMachine[nexusoperationpb.OperationStatus] = (*Operation)(nil)

// ErrCancellationAlreadyRequested is returned when a cancellation has already been requested for an operation.
var ErrCancellationAlreadyRequested = serviceerror.NewFailedPrecondition("cancellation already requested")

// ErrOperationAlreadyCompleted is returned when trying to cancel an operation that has already completed.
var ErrOperationAlreadyCompleted = serviceerror.NewFailedPrecondition("operation already completed")

type OperationStore any

// Operation is a CHASM component that represents a Nexus operation.
type Operation struct {
chasm.UnimplementedComponent

Expand All @@ -19,12 +27,17 @@ type Operation struct {
// Pointer to an implementation of the "store". For a workflow-based Nexus operation
// this is a parent pointer back to the workflow. For a standalone Nexus operation this is nil.
Store chasm.ParentPtr[OperationStore]

// Cancellation is a child component that manages sending the cancel request to the Nexus endpoint.
Cancellation chasm.Field[*Cancellation]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing doc string.

}

// NewOperation creates a new Operation component with the given persisted state.
func NewOperation(state *nexusoperationpb.OperationState) *Operation {
return &Operation{OperationState: state}
}

// LifecycleState maps the operation's status to a CHASM lifecycle state.
func (o *Operation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
switch o.Status {
case nexusoperationpb.OPERATION_STATUS_SUCCEEDED:
Expand All @@ -38,10 +51,36 @@ func (o *Operation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
}
}

// StateMachineState returns the current operation status.
func (o *Operation) StateMachineState() nexusoperationpb.OperationStatus {
return o.Status
}

// SetStateMachineState sets the operation status.
func (o *Operation) SetStateMachineState(status nexusoperationpb.OperationStatus) {
o.Status = status
}

// Cancel requests cancellation of the operation. It creates a Cancellation child component and, if the
// operation has already started, schedules the cancellation request to be sent to the Nexus endpoint.
func (o *Operation) Cancel(ctx chasm.MutableContext, requestedEventID int64) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document all exported members.

if !TransitionCanceled.Possible(o) {
return ErrOperationAlreadyCompleted
}
if _, ok := o.Cancellation.TryGet(ctx); ok {
return ErrCancellationAlreadyRequested
}

cancellation := newCancellation(&nexusoperationpb.CancellationState{
RequestedEventId: requestedEventID,
})
o.Cancellation = chasm.NewComponentField(ctx, cancellation)

// Once started, the handler returns a token that can be used in the cancelation request.
// Until then, no need to schedule the cancelation.
if o.Status == nexusoperationpb.OPERATION_STATUS_STARTED {
Comment thread
stephanos marked this conversation as resolved.
return transitionCancellationScheduled.Apply(cancellation, ctx, EventCancellationScheduled{})
}

return nil
}
19 changes: 9 additions & 10 deletions chasm/lib/nexusoperation/operation_statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ var transitionSucceeded = chasm.NewTransition(
},
nexusoperationpb.OPERATION_STATUS_SUCCEEDED,
func(o *Operation, ctx chasm.MutableContext, event EventSucceeded) error {
// Terminal state - no tasks to emit
// The component will be deleted after this transition
// Terminal state - no tasks to emit.
// The component will be deleted after this transition.
return nil
},
)
Expand All @@ -179,9 +179,8 @@ var transitionFailed = chasm.NewTransition(
},
nexusoperationpb.OPERATION_STATUS_FAILED,
func(o *Operation, ctx chasm.MutableContext, event EventFailed) error {
// Terminal state - no tasks to emit
// Not recording the last attempt information here since the state machine
// will be deleted immediately after this transition
// Terminal state - no tasks to emit.
// The component will be deleted after this transition.
return nil
},
)
Expand All @@ -190,16 +189,16 @@ var transitionFailed = chasm.NewTransition(
type EventCanceled struct {
}

var transitionCanceled = chasm.NewTransition(
var TransitionCanceled = chasm.NewTransition(
[]nexusoperationpb.OperationStatus{
nexusoperationpb.OPERATION_STATUS_SCHEDULED,
nexusoperationpb.OPERATION_STATUS_STARTED,
nexusoperationpb.OPERATION_STATUS_BACKING_OFF,
},
nexusoperationpb.OPERATION_STATUS_CANCELED,
func(o *Operation, ctx chasm.MutableContext, event EventCanceled) error {
// Terminal state - no tasks to emit
// The component will be deleted after this transition
// Terminal state - no tasks to emit.
// The component will be deleted after this transition.
return nil
},
)
Expand All @@ -216,8 +215,8 @@ var transitionTimedOut = chasm.NewTransition(
},
nexusoperationpb.OPERATION_STATUS_TIMED_OUT,
func(o *Operation, ctx chasm.MutableContext, event EventTimedOut) error {
// Terminal state - no tasks to emit
// The component will be deleted after this transition
// Terminal state - no tasks to emit.
// The component will be deleted after this transition.
return nil
},
)
2 changes: 1 addition & 1 deletion chasm/lib/nexusoperation/operation_statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func TestTransitionCanceled(t *testing.T) {

event := EventCanceled{}

err := transitionCanceled.Apply(operation, ctx, event)
err := TransitionCanceled.Apply(operation, ctx, event)
require.NoError(t, err)

require.Equal(t, nexusoperationpb.OPERATION_STATUS_CANCELED, operation.Status)
Expand Down
88 changes: 84 additions & 4 deletions chasm/lib/nexusoperation/workflow/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func registerCommandHandlers(
}
return registry.Register(
enumspb.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION,
handleCancelCommand,
h.handleCancelCommand,
)
}

Expand All @@ -68,6 +68,10 @@ func (ch *commandHandler) handleScheduleCommand(
}
}

if !ch.config.ChasmNexusEnabled(nsName) {
return command.ErrNotSupported
}

attrs := cmd.GetScheduleNexusOperationCommandAttributes()
if attrs == nil {
return command.FailWorkflowTaskError{
Expand Down Expand Up @@ -286,13 +290,89 @@ func (ch *commandHandler) handleScheduleCommand(
return nil
}

func handleCancelCommand(
func (ch *commandHandler) handleCancelCommand(
chasmCtx chasm.MutableContext,
wf *chasmworkflow.Workflow,
validator command.Validator,
cmd *commandpb.Command,
opts command.HandlerOptions,
) error {
// TODO: Implement CHASM nexus operation cancellation
return serviceerror.NewUnimplemented("CHASM nexus operation cancellation not yet implemented")
if !ch.config.Enabled() {
return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_FEATURE_DISABLED,
Message: "Nexus operations disabled",
}
}

nsName := chasmCtx.NamespaceEntry().Name().String()
if !ch.config.ChasmNexusEnabled(nsName) {
return command.ErrNotSupported
}

attrs := cmd.GetRequestCancelNexusOperationCommandAttributes()
if attrs == nil {
return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: "empty CancelNexusOperationCommandAttributes",
}
}

key := strconv.FormatInt(attrs.ScheduledEventId, 10)
operationField, operationFound := wf.Operations[key]
hasBufferedEvent := func() bool {
return wf.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId))
}

if !operationFound && !hasBufferedEvent() {
return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId),
}
}

// Always create the event even if there's a buffered completion to avoid breaking replay in the SDK.
// The event will be applied before the completion since buffered events are reordered and put at the end of the
// batch, after command events from the workflow task.
event := wf.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, func(he *historypb.HistoryEvent) {
he.Attributes = &historypb.HistoryEvent_NexusOperationCancelRequestedEventAttributes{
NexusOperationCancelRequestedEventAttributes: &historypb.NexusOperationCancelRequestedEventAttributes{
ScheduledEventId: attrs.ScheduledEventId,
WorkflowTaskCompletedEventId: opts.WorkflowTaskCompletedEventID,
},
}
he.UserMetadata = cmd.UserMetadata
})

if !operationFound {
// Operation not found but there's a buffered terminal event. The workflow couldn't know
// the operation completed while its task was in flight. Ignore.
return nil
}

op := operationField.Get(chasmCtx)
err := op.Cancel(chasmCtx, event.GetEventId())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self. Ideally all of the event ID tracking should be done in a location that is agnostic to where the component is embedded. Figure out a way to keep track of this in a way that the operation logic isn't directly affected.

if errors.Is(err, nexusoperation.ErrCancellationAlreadyRequested) {
return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("cancelation was already requested for an operation with scheduled event ID %d", attrs.ScheduledEventId),
}
}
return err
}

func makeNexusOperationTerminalEventFilter(scheduledEventID int64) func(event *historypb.HistoryEvent) bool {
return func(event *historypb.HistoryEvent) bool {
switch event.EventType {
case enumspb.EVENT_TYPE_NEXUS_OPERATION_COMPLETED:
return event.GetNexusOperationCompletedEventAttributes().GetScheduledEventId() == scheduledEventID
case enumspb.EVENT_TYPE_NEXUS_OPERATION_FAILED:
return event.GetNexusOperationFailedEventAttributes().GetScheduledEventId() == scheduledEventID
case enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCELED:
return event.GetNexusOperationCanceledEventAttributes().GetScheduledEventId() == scheduledEventID
case enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT:
return event.GetNexusOperationTimedOutEventAttributes().GetScheduledEventId() == scheduledEventID
default:
return false
}
}
}
Loading
Loading