Skip to content

Nexus ScheduleCommand in CHASM#9278

Open
stephanos wants to merge 2 commits intotemporalio:nexus/hsm-to-chasm-migrationfrom
stephanos:nexus-chasm-cmd
Open

Nexus ScheduleCommand in CHASM#9278
stephanos wants to merge 2 commits intotemporalio:nexus/hsm-to-chasm-migrationfrom
stephanos:nexus-chasm-cmd

Conversation

@stephanos
Copy link
Contributor

@stephanos stephanos commented Feb 10, 2026

What changed?

Ported command handler for Nexus "schedule" command from HSM to CHASM.

Why?

CHASM migration.

How did you test it?

  • built
  • run locally and tested manually
  • covered by existing tests
  • added new unit test(s)
  • added new functional test(s)

@stephanos stephanos changed the title Nexus chasm cmd Nexus ScheduleCommand in CHASM Feb 10, 2026
@stephanos stephanos force-pushed the nexus-chasm-cmd branch 2 times, most recently from 248e7e1 to 321adf6 Compare February 10, 2026 19:32
// ComponentOptions(Component) []ComponentOption

// GetContext returns the underlying context.Context for use in I/O operations (e.g., RPC calls).
GetContext() context.Context
Copy link
Contributor Author

@stephanos stephanos Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to become exported because of the h.endpointRegistry.GetByName call in the command handler. I've considered a few alternative options (e.g. adding ctx to the handler interface, adding GetNexusEndpoint to MutableContext ...); but this seemed like the best in the end. Happy to discuss!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want this exposed in other cases. The command handler is an API handler that should have access to a Go context. I would rather expose the endpoint registry via the context instead.


if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToStartTimeout); err != nil {
return workflow.FailWorkflowTaskError{
return chasmcommand.FailWorkflowTaskError{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had moved in the last PR.

}

var transitionScheduled = chasm.NewTransition(
var TransitionScheduled = chasm.NewTransition(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required to be accessible from chasm/lib/nexusoperation/workflow/commands.go

@stephanos stephanos force-pushed the nexus-chasm-cmd branch 2 times, most recently from 1abb55d to 23e350b Compare February 10, 2026 20:03
google.protobuf.Timestamp next_attempt_schedule_time = 7;
temporal.api.failure.v1.Failure last_attempt_failure = 8;
int64 scheduled_event_id = 9;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modelled after NexusOperationInfo in proto/internal/temporal/server/api/persistence/v1/executions.proto.


Registry *command.Registry
Config *nexusoperation.Config
EndpointRegistry commonnexus.EndpointRegistry `optional:"true"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, matching doesn't start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we are registering the workflow lib in matching unnecessarily. Let's avoid that.

@stephanos stephanos force-pushed the nexus-chasm-cmd branch 3 times, most recently from 5ad0b10 to 57669e0 Compare February 10, 2026 21:05
)
}

func (ch *commandHandler) handleScheduleCommand(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migrated from components/nexusoperations/workflow/commands.go

Copy link
Contributor Author

@stephanos stephanos Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff
--- components/nexusoperations/workflow/commands.go
+++ chasm/lib/nexusoperation/workflow/commands.go
@@ -1,38 +1,38 @@
-func (ch *commandHandler) HandleScheduleCommand(
-	ctx context.Context,
-	ms historyi.MutableState,
-	validator chasmcommand.Validator,
-	workflowTaskCompletedEventID int64,
-	command *commandpb.Command,
+func (ch *commandHandler) handleScheduleCommand(
+	chasmCtx chasm.MutableContext,
+	wf *workflow.Workflow,
+	validator command.Validator,
+	cmd *commandpb.Command,
+	opts command.HandlerOptions,
 ) error {
-	ns := ms.GetNamespaceEntry()
-	nsName := ms.GetNamespaceEntry().Name().String()
+	ns := chasmCtx.GetNamespaceEntry()
+	nsName := ns.Name().String()
 
 	if !ch.config.Enabled() {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_FEATURE_DISABLED,
 			Message: "Nexus operations disabled",
 		}
 	}
 
-	attrs := command.GetScheduleNexusOperationCommandAttributes()
+	attrs := cmd.GetScheduleNexusOperationCommandAttributes()
 	if attrs == nil {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: "empty ScheduleNexusOperationCommandAttributes",
 		}
 	}
 
 	var endpointID string
-	endpoint, err := ch.endpointRegistry.GetByName(ctx, ns.ID(), attrs.Endpoint)
+	endpoint, err := ch.endpointRegistry.GetByName(chasmCtx.GetContext(), ns.ID(), attrs.Endpoint)
 	if err != nil {
 		if errors.As(err, new(*serviceerror.NotFound)) {
-			return chasmcommand.FailWorkflowTaskError{
+			return command.FailWorkflowTaskError{
 				Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 				Message: fmt.Sprintf("endpoint %q not found", attrs.Endpoint),
 			}
 		} else if errors.As(err, new(*serviceerror.PermissionDenied)) {
-			return chasmcommand.FailWorkflowTaskError{
+			return command.FailWorkflowTaskError{
 				Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 				Message: fmt.Sprintf("caller namespace %q unauthorized for %q", ns.Name(), attrs.Endpoint),
 			}
@@ -44,7 +44,7 @@
 	}
 
 	if len(attrs.Service) > ch.config.MaxServiceNameLength(nsName) {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.Service exceeds length limit of %d",
@@ -54,7 +54,7 @@
 	}
 
 	if len(attrs.Operation) > ch.config.MaxOperationNameLength(nsName) {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.Operation exceeds length limit of %d",
@@ -64,7 +64,7 @@
 	}
 
 	if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToCloseTimeout); err != nil {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.ScheduleToCloseTimeout is invalid: %v", err),
@@ -72,7 +72,7 @@
 	}
 
 	if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToStartTimeout); err != nil {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.ScheduleToStartTimeout is invalid: %v", err),
@@ -80,7 +80,7 @@
 	}
 
 	if err := timestamp.ValidateAndCapProtoDuration(attrs.StartToCloseTimeout); err != nil {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.StartToCloseTimeout is invalid: %v", err),
@@ -88,7 +88,7 @@
 	}
 
 	if !validator.IsValidPayloadSize(attrs.Input.Size()) {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause:             enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message:           "ScheduleNexusOperationCommandAttributes.Input exceeds size limit",
 			TerminateWorkflow: true,
@@ -102,7 +102,7 @@
 		lowerCaseHeader[lowerK] = v
 		headerLength += len(lowerK) + len(v)
 		if slices.Contains(ch.config.DisallowedOperationHeaders(), lowerK) {
-			return chasmcommand.FailWorkflowTaskError{
+			return command.FailWorkflowTaskError{
 				Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 				Message: fmt.Sprintf("ScheduleNexusOperationCommandAttributes.NexusHeader contains a disallowed header key: %q", k),
 			}
@@ -110,28 +110,26 @@
 	}
 
 	if headerLength > ch.config.MaxOperationHeaderSize(nsName) {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: "ScheduleNexusOperationCommandAttributes.NexusHeader exceeds size limit",
 		}
 	}
 
-	root := ms.HSM()
-	coll := nexusoperations.MachineCollection(root)
 	maxPendingOperations := ch.config.MaxConcurrentOperations(nsName)
-	if coll.Size() >= ch.config.MaxConcurrentOperations(nsName) {
-		return chasmcommand.FailWorkflowTaskError{
+	if wf.PendingNexusOperationCount() >= maxPendingOperations {
+		return command.FailWorkflowTaskError{
 			Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_NEXUS_OPERATIONS_LIMIT_EXCEEDED,
 			Message: fmt.Sprintf("workflow has reached the pending nexus operation limit of %d for this namespace", maxPendingOperations),
 		}
 	}
 
 	// Trim timeout to workflow run timeout.
-	runTimeout := ms.GetExecutionInfo().WorkflowRunTimeout.AsDuration()
+	runTimeout := wf.WorkflowRunTimeout()
 	opTimeout := attrs.ScheduleToCloseTimeout.AsDuration()
 	if runTimeout > 0 && (opTimeout == 0 || opTimeout > runTimeout) {
-		attrs.ScheduleToCloseTimeout = ms.GetExecutionInfo().WorkflowRunTimeout
-		opTimeout = attrs.ScheduleToCloseTimeout.AsDuration()
+		attrs.ScheduleToCloseTimeout = durationpb.New(runTimeout)
+		opTimeout = runTimeout
 	}
 
 	// Trim timeout to max allowed timeout.
@@ -153,7 +151,8 @@
 		}
 	}
 
-	event := ms.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) {
+	requestID := uuid.NewString()
+	event := chasmCtx.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) {
 		he.Attributes = &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{
 			NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{
 				Endpoint:                     attrs.Endpoint,
@@ -165,12 +164,35 @@
 				ScheduleToStartTimeout:       attrs.ScheduleToStartTimeout,
 				StartToCloseTimeout:          attrs.StartToCloseTimeout,
 				NexusHeader:                  lowerCaseHeader,
-				RequestId:                    uuid.NewString(),
-				WorkflowTaskCompletedEventId: workflowTaskCompletedEventID,
+				RequestId:                    requestID,
+				WorkflowTaskCompletedEventId: opts.WorkflowTaskCompletedEventID,
 			},
 		}
-		he.UserMetadata = command.UserMetadata
+		he.UserMetadata = cmd.UserMetadata
 	})
 
-	return nexusoperations.ScheduledEventDefinition{}.Apply(root, event)
+	scheduledTime := event.GetEventTime()
+	if scheduledTime == nil {
+		scheduledTime = timestamppb.Now()
+	}
+
+	op := nexusoperation.NewOperation(&nexusoperationpb.OperationState{
+		EndpointId:             endpointID,
+		Endpoint:               attrs.Endpoint,
+		Service:                attrs.Service,
+		Operation:              attrs.Operation,
+		ScheduledTime:          scheduledTime,
+		ScheduleToCloseTimeout: attrs.ScheduleToCloseTimeout,
+		RequestId:              requestID,
+		Attempt:                1,
+	})
+
+	if err := nexusoperation.TransitionScheduled.Apply(op, chasmCtx, nexusoperation.EventScheduled{}); err != nil {
+		return err
+	}
+
+	key := strconv.FormatInt(event.GetEventId(), 10)
+	wf.AddNexusOperation(chasmCtx, key, op)
+
+	return nil
 }

@stephanos stephanos force-pushed the nexus-chasm-cmd branch 3 times, most recently from 5175002 to 6d1d649 Compare February 10, 2026 22:42

option go_package = "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb;nexusoperationpb";

message OperationState {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (w *Workflow) PendingNexusOperationCount() int {
return len(w.Operations)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Technically these are not needed as Operations is exported; but it's my understanding that that is for technical reasons only and this makes for better encapsulation.

@stephanos stephanos force-pushed the nexus-chasm-cmd branch 3 times, most recently from 47827cc to e0f25c7 Compare February 10, 2026 23:59
// worker request.
type Handler func(
chasmCtx chasm.MutableContext,
wf *chasmworkflow.Workflow,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to change the Handler interface to gain access to workflow's Operations. I've considered a few options here and this one felt the most natural. Happy to discuss.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would define this as an interface to prevent the circular dependency issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other approaches too. Because you already have the workflow in a parent pointer, you could potentially use that.

@@ -0,0 +1,880 @@
package workflow
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ported from components/nexusoperations/workflow/commands_test.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff
--- components/nexusoperations/workflow/commands.go
+++ chasm/lib/nexusoperation/workflow/commands.go
@@ -1,38 +1,38 @@
-func (ch *commandHandler) HandleScheduleCommand(
-	ctx context.Context,
-	ms historyi.MutableState,
-	validator chasmcommand.Validator,
-	workflowTaskCompletedEventID int64,
-	command *commandpb.Command,
+func (ch *commandHandler) handleScheduleCommand(
+	chasmCtx chasm.MutableContext,
+	wf *workflow.Workflow,
+	validator command.Validator,
+	cmd *commandpb.Command,
+	opts command.HandlerOptions,
 ) error {
-	ns := ms.GetNamespaceEntry()
-	nsName := ms.GetNamespaceEntry().Name().String()
+	ns := chasmCtx.GetNamespaceEntry()
+	nsName := ns.Name().String()
 
 	if !ch.config.Enabled() {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_FEATURE_DISABLED,
 			Message: "Nexus operations disabled",
 		}
 	}
 
-	attrs := command.GetScheduleNexusOperationCommandAttributes()
+	attrs := cmd.GetScheduleNexusOperationCommandAttributes()
 	if attrs == nil {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: "empty ScheduleNexusOperationCommandAttributes",
 		}
 	}
 
 	var endpointID string
-	endpoint, err := ch.endpointRegistry.GetByName(ctx, ns.ID(), attrs.Endpoint)
+	endpoint, err := ch.endpointRegistry.GetByName(chasmCtx.GetContext(), ns.ID(), attrs.Endpoint)
 	if err != nil {
 		if errors.As(err, new(*serviceerror.NotFound)) {
-			return chasmcommand.FailWorkflowTaskError{
+			return command.FailWorkflowTaskError{
 				Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 				Message: fmt.Sprintf("endpoint %q not found", attrs.Endpoint),
 			}
 		} else if errors.As(err, new(*serviceerror.PermissionDenied)) {
-			return chasmcommand.FailWorkflowTaskError{
+			return command.FailWorkflowTaskError{
 				Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 				Message: fmt.Sprintf("caller namespace %q unauthorized for %q", ns.Name(), attrs.Endpoint),
 			}
@@ -44,7 +44,7 @@
 	}
 
 	if len(attrs.Service) > ch.config.MaxServiceNameLength(nsName) {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.Service exceeds length limit of %d",
@@ -54,7 +54,7 @@
 	}
 
 	if len(attrs.Operation) > ch.config.MaxOperationNameLength(nsName) {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.Operation exceeds length limit of %d",
@@ -64,7 +64,7 @@
 	}
 
 	if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToCloseTimeout); err != nil {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.ScheduleToCloseTimeout is invalid: %v", err),
@@ -72,7 +72,7 @@
 	}
 
 	if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToStartTimeout); err != nil {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.ScheduleToStartTimeout is invalid: %v", err),
@@ -80,7 +80,7 @@
 	}
 
 	if err := timestamp.ValidateAndCapProtoDuration(attrs.StartToCloseTimeout); err != nil {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: fmt.Sprintf(
 				"ScheduleNexusOperationCommandAttributes.StartToCloseTimeout is invalid: %v", err),
@@ -88,7 +88,7 @@
 	}
 
 	if !validator.IsValidPayloadSize(attrs.Input.Size()) {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause:             enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message:           "ScheduleNexusOperationCommandAttributes.Input exceeds size limit",
 			TerminateWorkflow: true,
@@ -102,7 +102,7 @@
 		lowerCaseHeader[lowerK] = v
 		headerLength += len(lowerK) + len(v)
 		if slices.Contains(ch.config.DisallowedOperationHeaders(), lowerK) {
-			return chasmcommand.FailWorkflowTaskError{
+			return command.FailWorkflowTaskError{
 				Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 				Message: fmt.Sprintf("ScheduleNexusOperationCommandAttributes.NexusHeader contains a disallowed header key: %q", k),
 			}
@@ -110,28 +110,26 @@
 	}
 
 	if headerLength > ch.config.MaxOperationHeaderSize(nsName) {
-		return chasmcommand.FailWorkflowTaskError{
+		return command.FailWorkflowTaskError{
 			Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
 			Message: "ScheduleNexusOperationCommandAttributes.NexusHeader exceeds size limit",
 		}
 	}
 
-	root := ms.HSM()
-	coll := nexusoperations.MachineCollection(root)
 	maxPendingOperations := ch.config.MaxConcurrentOperations(nsName)
-	if coll.Size() >= ch.config.MaxConcurrentOperations(nsName) {
-		return chasmcommand.FailWorkflowTaskError{
+	if len(wf.Operations) >= maxPendingOperations {
+		return command.FailWorkflowTaskError{
 			Cause:   enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_NEXUS_OPERATIONS_LIMIT_EXCEEDED,
 			Message: fmt.Sprintf("workflow has reached the pending nexus operation limit of %d for this namespace", maxPendingOperations),
 		}
 	}
 
 	// Trim timeout to workflow run timeout.
-	runTimeout := ms.GetExecutionInfo().WorkflowRunTimeout.AsDuration()
+	runTimeout := wf.WorkflowRunTimeout()
 	opTimeout := attrs.ScheduleToCloseTimeout.AsDuration()
 	if runTimeout > 0 && (opTimeout == 0 || opTimeout > runTimeout) {
-		attrs.ScheduleToCloseTimeout = ms.GetExecutionInfo().WorkflowRunTimeout
-		opTimeout = attrs.ScheduleToCloseTimeout.AsDuration()
+		attrs.ScheduleToCloseTimeout = durationpb.New(runTimeout)
+		opTimeout = runTimeout
 	}
 
 	// Trim timeout to max allowed timeout.
@@ -153,7 +151,7 @@
 		}
 	}
 
-	event := ms.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) {
+	event := chasmCtx.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) {
 		he.Attributes = &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{
 			NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{
 				Endpoint:                     attrs.Endpoint,
@@ -166,11 +164,33 @@
 				StartToCloseTimeout:          attrs.StartToCloseTimeout,
 				NexusHeader:                  lowerCaseHeader,
 				RequestId:                    uuid.NewString(),
-				WorkflowTaskCompletedEventId: workflowTaskCompletedEventID,
+				WorkflowTaskCompletedEventId: opts.WorkflowTaskCompletedEventID,
 			},
 		}
-		he.UserMetadata = command.UserMetadata
+		he.UserMetadata = cmd.UserMetadata
 	})
 
-	return nexusoperations.ScheduledEventDefinition{}.Apply(root, event)
+	scheduledEventID := event.GetEventId()
+	scheduledTime := event.GetEventTime()
+	if scheduledTime == nil {
+		scheduledTime = timestamppb.Now()
+	}
+
+	op := nexusoperation.NewOperation(&nexusoperationpb.OperationState{
+		ScheduledTime:    scheduledTime,
+		ScheduledEventId: scheduledEventID,
+		Attempt:          1,
+	})
+
+	if err := nexusoperation.TransitionScheduled.Apply(op, chasmCtx, nexusoperation.EventScheduled{}); err != nil {
+		return err
+	}
+
+	key := strconv.FormatInt(scheduledEventID, 10)
+	if wf.Operations == nil {
+		wf.Operations = make(chasm.Map[string, *nexusoperation.Operation])
+	}
+	wf.Operations[key] = chasm.NewComponentField(chasmCtx, op)
+
+	return nil
 }

@stephanos stephanos marked this pull request as ready for review February 11, 2026 00:12
@stephanos stephanos requested review from a team as code owners February 11, 2026 00:12
@@ -0,0 +1,269 @@
package workflow
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to be moved here from chasm/lib/nexusoperation/commands.go to break the cyclical import dependency: CHASM workflow needs nexusoperation's Operation - but nexusoperation (now) required CHASM workflow's Workflow to access the workflow's operations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can create an interface that exposes just the methods you need from Workflow IMHO and keep everything in the same package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm on the fence about this. I want to meet and consider the alternatives.

Copy link
Contributor

@KeithB-Temporal KeithB-Temporal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed these changes, they look pretty good to me. Mostly just moving things around and restructuring to align with the code in the new location. I'd approve, but I want someone with some deeper CHASM knowledge to ensure we are using those primitives correctly and we like the shape.

Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments but I have a more meta comment.
I am now wondering if making everything so modular is worth it. I really like making event and command definitions something that is registrable and and adhere to a specific interface. We can extend these definitions later and use them for action attribution and other use cases.


// Schedule-to-close timeout for this operation.
// This is the only timeout settable by a workflow.
google.protobuf.Duration schedule_to_close_timeout = 7;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're going to want to add schedule_to_start and start_to_close at this point.

google.protobuf.Timestamp scheduled_time = 6;

// Schedule-to-close timeout for this operation.
// This is the only timeout settable by a workflow.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This is the only timeout settable by a workflow.

string request_id = 8;

// Token for fetching the scheduled event from history.
bytes scheduled_event_token = 9;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to finalize this concept in CHASM. Let's sync in our crew channel.

// ComponentOptions(Component) []ComponentOption

// GetContext returns the underlying context.Context for use in I/O operations (e.g., RPC calls).
GetContext() context.Context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want this exposed in other cases. The command handler is an API handler that should have access to a Go context. I would rather expose the endpoint registry via the context instead.

Comment on lines +32 to +33
// GetNamespaceEntry returns the namespace entry for the execution.
GetNamespaceEntry() *namespace.Namespace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// GetNamespaceEntry returns the namespace entry for the execution.
GetNamespaceEntry() *namespace.Namespace
// NamespaceEntry returns the namespace entry for the execution.
NamespaceEntry() *namespace.Namespace


Registry *command.Registry
Config *nexusoperation.Config
EndpointRegistry commonnexus.EndpointRegistry `optional:"true"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we are registering the workflow lib in matching unnecessarily. Let's avoid that.

// worker request.
type Handler func(
chasmCtx chasm.MutableContext,
wf *chasmworkflow.Workflow,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would define this as an interface to prevent the circular dependency issue.

@@ -0,0 +1,269 @@
package workflow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm on the fence about this. I want to meet and consider the alternatives.

Attempt: 1,
})

if err := nexusoperation.TransitionScheduled.Apply(op, chasmCtx, nexusoperation.EventScheduled{}); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have this logic be part of history event application so it's not duplicated. In HSM we made some compromises here but we don't need to carry those to CHASM.

// worker request.
type Handler func(
chasmCtx chasm.MutableContext,
wf *chasmworkflow.Workflow,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other approaches too. Because you already have the workflow in a parent pointer, you could potentially use that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants