Nexus ScheduleCommand in CHASM#9278
Nexus ScheduleCommand in CHASM#9278stephanos wants to merge 2 commits intotemporalio:nexus/hsm-to-chasm-migrationfrom
Conversation
248e7e1 to
321adf6
Compare
| // ComponentOptions(Component) []ComponentOption | ||
|
|
||
| // GetContext returns the underlying context.Context for use in I/O operations (e.g., RPC calls). | ||
| GetContext() context.Context |
There was a problem hiding this comment.
This had to become exported because of the h.endpointRegistry.GetByName call in the command handler. I've considered a few alternative options (e.g. adding ctx to the handler interface, adding GetNexusEndpoint to MutableContext ...); but this seemed like the best in the end. Happy to discuss!
There was a problem hiding this comment.
We don't want this exposed in other cases. The command handler is an API handler that should have access to a Go context. I would rather expose the endpoint registry via the context instead.
|
|
||
| if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToStartTimeout); err != nil { | ||
| return workflow.FailWorkflowTaskError{ | ||
| return chasmcommand.FailWorkflowTaskError{ |
There was a problem hiding this comment.
This had moved in the last PR.
| } | ||
|
|
||
| var transitionScheduled = chasm.NewTransition( | ||
| var TransitionScheduled = chasm.NewTransition( |
There was a problem hiding this comment.
Required to be accessible from chasm/lib/nexusoperation/workflow/commands.go
1abb55d to
23e350b
Compare
| google.protobuf.Timestamp next_attempt_schedule_time = 7; | ||
| temporal.api.failure.v1.Failure last_attempt_failure = 8; | ||
| int64 scheduled_event_id = 9; | ||
| } |
There was a problem hiding this comment.
Modelled after NexusOperationInfo in proto/internal/temporal/server/api/persistence/v1/executions.proto.
|
|
||
| Registry *command.Registry | ||
| Config *nexusoperation.Config | ||
| EndpointRegistry commonnexus.EndpointRegistry `optional:"true"` |
There was a problem hiding this comment.
Without this, matching doesn't start.
There was a problem hiding this comment.
This is because we are registering the workflow lib in matching unnecessarily. Let's avoid that.
5ad0b10 to
57669e0
Compare
| ) | ||
| } | ||
|
|
||
| func (ch *commandHandler) handleScheduleCommand( |
There was a problem hiding this comment.
migrated from components/nexusoperations/workflow/commands.go
There was a problem hiding this comment.
diff
--- components/nexusoperations/workflow/commands.go
+++ chasm/lib/nexusoperation/workflow/commands.go
@@ -1,38 +1,38 @@
-func (ch *commandHandler) HandleScheduleCommand(
- ctx context.Context,
- ms historyi.MutableState,
- validator chasmcommand.Validator,
- workflowTaskCompletedEventID int64,
- command *commandpb.Command,
+func (ch *commandHandler) handleScheduleCommand(
+ chasmCtx chasm.MutableContext,
+ wf *workflow.Workflow,
+ validator command.Validator,
+ cmd *commandpb.Command,
+ opts command.HandlerOptions,
) error {
- ns := ms.GetNamespaceEntry()
- nsName := ms.GetNamespaceEntry().Name().String()
+ ns := chasmCtx.GetNamespaceEntry()
+ nsName := ns.Name().String()
if !ch.config.Enabled() {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_FEATURE_DISABLED,
Message: "Nexus operations disabled",
}
}
- attrs := command.GetScheduleNexusOperationCommandAttributes()
+ attrs := cmd.GetScheduleNexusOperationCommandAttributes()
if attrs == nil {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: "empty ScheduleNexusOperationCommandAttributes",
}
}
var endpointID string
- endpoint, err := ch.endpointRegistry.GetByName(ctx, ns.ID(), attrs.Endpoint)
+ endpoint, err := ch.endpointRegistry.GetByName(chasmCtx.GetContext(), ns.ID(), attrs.Endpoint)
if err != nil {
if errors.As(err, new(*serviceerror.NotFound)) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("endpoint %q not found", attrs.Endpoint),
}
} else if errors.As(err, new(*serviceerror.PermissionDenied)) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("caller namespace %q unauthorized for %q", ns.Name(), attrs.Endpoint),
}
@@ -44,7 +44,7 @@
}
if len(attrs.Service) > ch.config.MaxServiceNameLength(nsName) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.Service exceeds length limit of %d",
@@ -54,7 +54,7 @@
}
if len(attrs.Operation) > ch.config.MaxOperationNameLength(nsName) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.Operation exceeds length limit of %d",
@@ -64,7 +64,7 @@
}
if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToCloseTimeout); err != nil {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.ScheduleToCloseTimeout is invalid: %v", err),
@@ -72,7 +72,7 @@
}
if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToStartTimeout); err != nil {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.ScheduleToStartTimeout is invalid: %v", err),
@@ -80,7 +80,7 @@
}
if err := timestamp.ValidateAndCapProtoDuration(attrs.StartToCloseTimeout); err != nil {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.StartToCloseTimeout is invalid: %v", err),
@@ -88,7 +88,7 @@
}
if !validator.IsValidPayloadSize(attrs.Input.Size()) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: "ScheduleNexusOperationCommandAttributes.Input exceeds size limit",
TerminateWorkflow: true,
@@ -102,7 +102,7 @@
lowerCaseHeader[lowerK] = v
headerLength += len(lowerK) + len(v)
if slices.Contains(ch.config.DisallowedOperationHeaders(), lowerK) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("ScheduleNexusOperationCommandAttributes.NexusHeader contains a disallowed header key: %q", k),
}
@@ -110,28 +110,26 @@
}
if headerLength > ch.config.MaxOperationHeaderSize(nsName) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: "ScheduleNexusOperationCommandAttributes.NexusHeader exceeds size limit",
}
}
- root := ms.HSM()
- coll := nexusoperations.MachineCollection(root)
maxPendingOperations := ch.config.MaxConcurrentOperations(nsName)
- if coll.Size() >= ch.config.MaxConcurrentOperations(nsName) {
- return chasmcommand.FailWorkflowTaskError{
+ if wf.PendingNexusOperationCount() >= maxPendingOperations {
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_NEXUS_OPERATIONS_LIMIT_EXCEEDED,
Message: fmt.Sprintf("workflow has reached the pending nexus operation limit of %d for this namespace", maxPendingOperations),
}
}
// Trim timeout to workflow run timeout.
- runTimeout := ms.GetExecutionInfo().WorkflowRunTimeout.AsDuration()
+ runTimeout := wf.WorkflowRunTimeout()
opTimeout := attrs.ScheduleToCloseTimeout.AsDuration()
if runTimeout > 0 && (opTimeout == 0 || opTimeout > runTimeout) {
- attrs.ScheduleToCloseTimeout = ms.GetExecutionInfo().WorkflowRunTimeout
- opTimeout = attrs.ScheduleToCloseTimeout.AsDuration()
+ attrs.ScheduleToCloseTimeout = durationpb.New(runTimeout)
+ opTimeout = runTimeout
}
// Trim timeout to max allowed timeout.
@@ -153,7 +151,8 @@
}
}
- event := ms.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) {
+ requestID := uuid.NewString()
+ event := chasmCtx.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) {
he.Attributes = &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{
NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{
Endpoint: attrs.Endpoint,
@@ -165,12 +164,35 @@
ScheduleToStartTimeout: attrs.ScheduleToStartTimeout,
StartToCloseTimeout: attrs.StartToCloseTimeout,
NexusHeader: lowerCaseHeader,
- RequestId: uuid.NewString(),
- WorkflowTaskCompletedEventId: workflowTaskCompletedEventID,
+ RequestId: requestID,
+ WorkflowTaskCompletedEventId: opts.WorkflowTaskCompletedEventID,
},
}
- he.UserMetadata = command.UserMetadata
+ he.UserMetadata = cmd.UserMetadata
})
- return nexusoperations.ScheduledEventDefinition{}.Apply(root, event)
+ scheduledTime := event.GetEventTime()
+ if scheduledTime == nil {
+ scheduledTime = timestamppb.Now()
+ }
+
+ op := nexusoperation.NewOperation(&nexusoperationpb.OperationState{
+ EndpointId: endpointID,
+ Endpoint: attrs.Endpoint,
+ Service: attrs.Service,
+ Operation: attrs.Operation,
+ ScheduledTime: scheduledTime,
+ ScheduleToCloseTimeout: attrs.ScheduleToCloseTimeout,
+ RequestId: requestID,
+ Attempt: 1,
+ })
+
+ if err := nexusoperation.TransitionScheduled.Apply(op, chasmCtx, nexusoperation.EventScheduled{}); err != nil {
+ return err
+ }
+
+ key := strconv.FormatInt(event.GetEventId(), 10)
+ wf.AddNexusOperation(chasmCtx, key, op)
+
+ return nil
}
5175002 to
6d1d649
Compare
|
|
||
| option go_package = "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb;nexusoperationpb"; | ||
|
|
||
| message OperationState { |
There was a problem hiding this comment.
Made this the exact same as in Chetan's PR https://github.com/temporalio/temporal/pull/9081/changes#diff-acb4219132cc3df0b2bedbcfe29621174e7fa552f2dd1c02834ce096f1be5f8f
6d1d649 to
703b0f6
Compare
| func (w *Workflow) PendingNexusOperationCount() int { | ||
| return len(w.Operations) | ||
| } | ||
|
|
There was a problem hiding this comment.
^ Technically these are not needed as Operations is exported; but it's my understanding that that is for technical reasons only and this makes for better encapsulation.
47827cc to
e0f25c7
Compare
e0f25c7 to
02c1013
Compare
| // worker request. | ||
| type Handler func( | ||
| chasmCtx chasm.MutableContext, | ||
| wf *chasmworkflow.Workflow, |
There was a problem hiding this comment.
Had to change the Handler interface to gain access to workflow's Operations. I've considered a few options here and this one felt the most natural. Happy to discuss.
There was a problem hiding this comment.
I would define this as an interface to prevent the circular dependency issue.
There was a problem hiding this comment.
There are other approaches too. Because you already have the workflow in a parent pointer, you could potentially use that.
| @@ -0,0 +1,880 @@ | |||
| package workflow | |||
There was a problem hiding this comment.
Ported from components/nexusoperations/workflow/commands_test.go
There was a problem hiding this comment.
diff
--- components/nexusoperations/workflow/commands.go
+++ chasm/lib/nexusoperation/workflow/commands.go
@@ -1,38 +1,38 @@
-func (ch *commandHandler) HandleScheduleCommand(
- ctx context.Context,
- ms historyi.MutableState,
- validator chasmcommand.Validator,
- workflowTaskCompletedEventID int64,
- command *commandpb.Command,
+func (ch *commandHandler) handleScheduleCommand(
+ chasmCtx chasm.MutableContext,
+ wf *workflow.Workflow,
+ validator command.Validator,
+ cmd *commandpb.Command,
+ opts command.HandlerOptions,
) error {
- ns := ms.GetNamespaceEntry()
- nsName := ms.GetNamespaceEntry().Name().String()
+ ns := chasmCtx.GetNamespaceEntry()
+ nsName := ns.Name().String()
if !ch.config.Enabled() {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_FEATURE_DISABLED,
Message: "Nexus operations disabled",
}
}
- attrs := command.GetScheduleNexusOperationCommandAttributes()
+ attrs := cmd.GetScheduleNexusOperationCommandAttributes()
if attrs == nil {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: "empty ScheduleNexusOperationCommandAttributes",
}
}
var endpointID string
- endpoint, err := ch.endpointRegistry.GetByName(ctx, ns.ID(), attrs.Endpoint)
+ endpoint, err := ch.endpointRegistry.GetByName(chasmCtx.GetContext(), ns.ID(), attrs.Endpoint)
if err != nil {
if errors.As(err, new(*serviceerror.NotFound)) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("endpoint %q not found", attrs.Endpoint),
}
} else if errors.As(err, new(*serviceerror.PermissionDenied)) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("caller namespace %q unauthorized for %q", ns.Name(), attrs.Endpoint),
}
@@ -44,7 +44,7 @@
}
if len(attrs.Service) > ch.config.MaxServiceNameLength(nsName) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.Service exceeds length limit of %d",
@@ -54,7 +54,7 @@
}
if len(attrs.Operation) > ch.config.MaxOperationNameLength(nsName) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.Operation exceeds length limit of %d",
@@ -64,7 +64,7 @@
}
if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToCloseTimeout); err != nil {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.ScheduleToCloseTimeout is invalid: %v", err),
@@ -72,7 +72,7 @@
}
if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToStartTimeout); err != nil {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.ScheduleToStartTimeout is invalid: %v", err),
@@ -80,7 +80,7 @@
}
if err := timestamp.ValidateAndCapProtoDuration(attrs.StartToCloseTimeout); err != nil {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf(
"ScheduleNexusOperationCommandAttributes.StartToCloseTimeout is invalid: %v", err),
@@ -88,7 +88,7 @@
}
if !validator.IsValidPayloadSize(attrs.Input.Size()) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: "ScheduleNexusOperationCommandAttributes.Input exceeds size limit",
TerminateWorkflow: true,
@@ -102,7 +102,7 @@
lowerCaseHeader[lowerK] = v
headerLength += len(lowerK) + len(v)
if slices.Contains(ch.config.DisallowedOperationHeaders(), lowerK) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("ScheduleNexusOperationCommandAttributes.NexusHeader contains a disallowed header key: %q", k),
}
@@ -110,28 +110,26 @@
}
if headerLength > ch.config.MaxOperationHeaderSize(nsName) {
- return chasmcommand.FailWorkflowTaskError{
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES,
Message: "ScheduleNexusOperationCommandAttributes.NexusHeader exceeds size limit",
}
}
- root := ms.HSM()
- coll := nexusoperations.MachineCollection(root)
maxPendingOperations := ch.config.MaxConcurrentOperations(nsName)
- if coll.Size() >= ch.config.MaxConcurrentOperations(nsName) {
- return chasmcommand.FailWorkflowTaskError{
+ if len(wf.Operations) >= maxPendingOperations {
+ return command.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_NEXUS_OPERATIONS_LIMIT_EXCEEDED,
Message: fmt.Sprintf("workflow has reached the pending nexus operation limit of %d for this namespace", maxPendingOperations),
}
}
// Trim timeout to workflow run timeout.
- runTimeout := ms.GetExecutionInfo().WorkflowRunTimeout.AsDuration()
+ runTimeout := wf.WorkflowRunTimeout()
opTimeout := attrs.ScheduleToCloseTimeout.AsDuration()
if runTimeout > 0 && (opTimeout == 0 || opTimeout > runTimeout) {
- attrs.ScheduleToCloseTimeout = ms.GetExecutionInfo().WorkflowRunTimeout
- opTimeout = attrs.ScheduleToCloseTimeout.AsDuration()
+ attrs.ScheduleToCloseTimeout = durationpb.New(runTimeout)
+ opTimeout = runTimeout
}
// Trim timeout to max allowed timeout.
@@ -153,7 +151,7 @@
}
}
- event := ms.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) {
+ event := chasmCtx.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) {
he.Attributes = &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{
NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{
Endpoint: attrs.Endpoint,
@@ -166,11 +164,33 @@
StartToCloseTimeout: attrs.StartToCloseTimeout,
NexusHeader: lowerCaseHeader,
RequestId: uuid.NewString(),
- WorkflowTaskCompletedEventId: workflowTaskCompletedEventID,
+ WorkflowTaskCompletedEventId: opts.WorkflowTaskCompletedEventID,
},
}
- he.UserMetadata = command.UserMetadata
+ he.UserMetadata = cmd.UserMetadata
})
- return nexusoperations.ScheduledEventDefinition{}.Apply(root, event)
+ scheduledEventID := event.GetEventId()
+ scheduledTime := event.GetEventTime()
+ if scheduledTime == nil {
+ scheduledTime = timestamppb.Now()
+ }
+
+ op := nexusoperation.NewOperation(&nexusoperationpb.OperationState{
+ ScheduledTime: scheduledTime,
+ ScheduledEventId: scheduledEventID,
+ Attempt: 1,
+ })
+
+ if err := nexusoperation.TransitionScheduled.Apply(op, chasmCtx, nexusoperation.EventScheduled{}); err != nil {
+ return err
+ }
+
+ key := strconv.FormatInt(scheduledEventID, 10)
+ if wf.Operations == nil {
+ wf.Operations = make(chasm.Map[string, *nexusoperation.Operation])
+ }
+ wf.Operations[key] = chasm.NewComponentField(chasmCtx, op)
+
+ return nil
}
| @@ -0,0 +1,269 @@ | |||
| package workflow | |||
There was a problem hiding this comment.
This had to be moved here from chasm/lib/nexusoperation/commands.go to break the cyclical import dependency: CHASM workflow needs nexusoperation's Operation - but nexusoperation (now) required CHASM workflow's Workflow to access the workflow's operations.
There was a problem hiding this comment.
You can create an interface that exposes just the methods you need from Workflow IMHO and keep everything in the same package.
There was a problem hiding this comment.
But I'm on the fence about this. I want to meet and consider the alternatives.
KeithB-Temporal
left a comment
There was a problem hiding this comment.
I reviewed these changes, they look pretty good to me. Mostly just moving things around and restructuring to align with the code in the new location. I'd approve, but I want someone with some deeper CHASM knowledge to ensure we are using those primitives correctly and we like the shape.
bergundy
left a comment
There was a problem hiding this comment.
I added some comments but I have a more meta comment.
I am now wondering if making everything so modular is worth it. I really like making event and command definitions something that is registrable and and adhere to a specific interface. We can extend these definitions later and use them for action attribution and other use cases.
|
|
||
| // Schedule-to-close timeout for this operation. | ||
| // This is the only timeout settable by a workflow. | ||
| google.protobuf.Duration schedule_to_close_timeout = 7; |
There was a problem hiding this comment.
You're going to want to add schedule_to_start and start_to_close at this point.
| google.protobuf.Timestamp scheduled_time = 6; | ||
|
|
||
| // Schedule-to-close timeout for this operation. | ||
| // This is the only timeout settable by a workflow. |
There was a problem hiding this comment.
| // This is the only timeout settable by a workflow. |
| string request_id = 8; | ||
|
|
||
| // Token for fetching the scheduled event from history. | ||
| bytes scheduled_event_token = 9; |
There was a problem hiding this comment.
We need to finalize this concept in CHASM. Let's sync in our crew channel.
| // ComponentOptions(Component) []ComponentOption | ||
|
|
||
| // GetContext returns the underlying context.Context for use in I/O operations (e.g., RPC calls). | ||
| GetContext() context.Context |
There was a problem hiding this comment.
We don't want this exposed in other cases. The command handler is an API handler that should have access to a Go context. I would rather expose the endpoint registry via the context instead.
| // GetNamespaceEntry returns the namespace entry for the execution. | ||
| GetNamespaceEntry() *namespace.Namespace |
There was a problem hiding this comment.
| // GetNamespaceEntry returns the namespace entry for the execution. | |
| GetNamespaceEntry() *namespace.Namespace | |
| // NamespaceEntry returns the namespace entry for the execution. | |
| NamespaceEntry() *namespace.Namespace |
|
|
||
| Registry *command.Registry | ||
| Config *nexusoperation.Config | ||
| EndpointRegistry commonnexus.EndpointRegistry `optional:"true"` |
There was a problem hiding this comment.
This is because we are registering the workflow lib in matching unnecessarily. Let's avoid that.
| // worker request. | ||
| type Handler func( | ||
| chasmCtx chasm.MutableContext, | ||
| wf *chasmworkflow.Workflow, |
There was a problem hiding this comment.
I would define this as an interface to prevent the circular dependency issue.
| @@ -0,0 +1,269 @@ | |||
| package workflow | |||
There was a problem hiding this comment.
But I'm on the fence about this. I want to meet and consider the alternatives.
| Attempt: 1, | ||
| }) | ||
|
|
||
| if err := nexusoperation.TransitionScheduled.Apply(op, chasmCtx, nexusoperation.EventScheduled{}); err != nil { |
There was a problem hiding this comment.
It would be good to have this logic be part of history event application so it's not duplicated. In HSM we made some compromises here but we don't need to carry those to CHASM.
| // worker request. | ||
| type Handler func( | ||
| chasmCtx chasm.MutableContext, | ||
| wf *chasmworkflow.Workflow, |
There was a problem hiding this comment.
There are other approaches too. Because you already have the workflow in a parent pointer, you could potentially use that.
What changed?
Ported command handler for Nexus "schedule" command from HSM to CHASM.
Why?
CHASM migration.
How did you test it?