-
Notifications
You must be signed in to change notification settings - Fork 1.5k
add time-skipping configuration to UpdateWorkflowExecutionOptions
#9944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -693,7 +693,6 @@ func (wh *WorkflowHandler) validateTimeSkippingConfig( | |
| timeSkippingConfig *workflowpb.TimeSkippingConfig, | ||
| namespaceName namespace.Name, | ||
| ) error { | ||
|
|
||
| if timeSkippingConfig == nil { | ||
| return nil | ||
| } | ||
|
|
@@ -722,7 +721,7 @@ func (wh *WorkflowHandler) validateTimeSkippingConfig( | |
| namespace.MinTimeSkippingDuration, | ||
| ) | ||
| } | ||
| // todo: will need to check current virtual time in updateOptions scenario | ||
| // todo: need to adapt the timeSource after time-skipping timeSource is implemented | ||
| case *workflowpb.TimeSkippingConfig_MaxTargetTime: | ||
| if bound.MaxTargetTime.AsTime().Before(wh.namespaceHandler.timeSource.Now().Add(namespace.MinTimeSkippingDuration)) { | ||
| return serviceerror.NewUnimplementedf( | ||
|
|
@@ -2456,6 +2455,17 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, request * | |
| return nil, serviceerror.NewInternalf("unknown reset reapply type: %v", request.GetResetReapplyType()) | ||
| } | ||
|
|
||
| for _, postOp := range request.GetPostResetOperations() { | ||
| if updateOpts := postOp.GetUpdateWorkflowOptions(); updateOpts != nil { | ||
| if err := wh.validateTimeSkippingConfig( | ||
| updateOpts.GetWorkflowExecutionOptions().GetTimeSkippingConfig(), | ||
| namespace.Name(request.GetNamespace()), | ||
| ); err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
| } | ||
|
|
||
| namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) | ||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -5669,9 +5679,25 @@ func (wh *WorkflowHandler) StartBatchOperation( | |
| case *workflowservice.StartBatchOperationRequest_ResetOperation: | ||
| input.BatchType = enumspb.BATCH_OPERATION_TYPE_RESET | ||
| identity = op.ResetOperation.GetIdentity() | ||
| for _, postOp := range op.ResetOperation.GetPostResetOperations() { | ||
| if updateOpts := postOp.GetUpdateWorkflowOptions(); updateOpts != nil { | ||
| if err := wh.validateTimeSkippingConfig( | ||
| updateOpts.GetWorkflowExecutionOptions().GetTimeSkippingConfig(), | ||
| namespace.Name(request.GetNamespace()), | ||
| ); err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
| } | ||
| case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation: | ||
| input.BatchType = enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS | ||
| identity = op.UpdateWorkflowOptionsOperation.GetIdentity() | ||
| if err := wh.validateTimeSkippingConfig( | ||
| op.UpdateWorkflowOptionsOperation.GetWorkflowExecutionOptions().GetTimeSkippingConfig(), | ||
| namespace.Name(request.GetNamespace()), | ||
| ); err != nil { | ||
| return nil, err | ||
| } | ||
| case *workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation: | ||
| input.BatchType = enumspb.BATCH_OPERATION_TYPE_UNPAUSE_ACTIVITY | ||
| identity = op.UnpauseActivitiesOperation.GetIdentity() | ||
|
|
@@ -6876,6 +6902,9 @@ func (wh *WorkflowHandler) UpdateWorkflowExecutionOptions( | |
| if err := priorities.Validate(opts.GetPriority()); err != nil { | ||
| return nil, err | ||
| } | ||
| if err := wh.validateTimeSkippingConfig(opts.GetTimeSkippingConfig(), namespace.Name(request.GetNamespace())); err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, that is my understanding of how dynamic config feature flag works -> we don't allow users turn on a feature and have a bunch of workflows running with this feature and then turn off the feature and still want to try to manipulate the feature. this kind of flexibility will make the system complicated.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may understand this incorrectly. cc @yycptt for a confirmation
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that makes sense. The one I pointed out is not a normal case scenario.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we don't support removing a feature flag after users enable it? I don't know I feel that will be weird. |
||
| return nil, err | ||
| } | ||
|
|
||
| namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) | ||
| if err != nil { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -151,7 +151,7 @@ func MergeAndApply( | |
| if mergedOpts.GetVersioningOverride() == nil { | ||
| unsetOverride = true | ||
| } | ||
| _, err = ms.AddWorkflowExecutionOptionsUpdatedEvent(mergedOpts.GetVersioningOverride(), unsetOverride, "", nil, nil, identity, mergedOpts.GetPriority()) | ||
| _, err = ms.AddWorkflowExecutionOptionsUpdatedEvent(mergedOpts.GetVersioningOverride(), unsetOverride, "", nil, nil, identity, mergedOpts.GetPriority(), mergedOpts.GetTimeSkippingConfig()) | ||
| if err != nil { | ||
| return nil, hasChanges, err | ||
| } | ||
|
|
@@ -172,6 +172,11 @@ func getOptionsFromMutableState(ms historyi.MutableState) *workflowpb.WorkflowEx | |
| opts.Priority = cloned | ||
| } | ||
| } | ||
| if tsInfo := ms.GetExecutionInfo().GetTimeSkippingInfo(); tsInfo != nil { | ||
| if cloned, ok := proto.Clone(tsInfo.GetConfig()).(*workflowpb.TimeSkippingConfig); ok { | ||
| opts.TimeSkippingConfig = cloned | ||
| } | ||
| } | ||
| return opts | ||
| } | ||
|
|
||
|
|
@@ -230,5 +235,54 @@ func mergeWorkflowExecutionOptions( | |
| mergeInto.Priority.FairnessWeight = mergeFrom.Priority.GetFairnessWeight() | ||
| } | ||
|
|
||
| // ==== Time Skipping Config | ||
| // nil means "no change" — only update if the caller provided an explicit value. | ||
| if _, ok := updateFields["timeSkippingConfig"]; ok { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can updateFields contain something like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this commit : d72dcb3 |
||
| if mergeFrom.GetTimeSkippingConfig() != nil { | ||
| mergeInto.TimeSkippingConfig = mergeFrom.GetTimeSkippingConfig() | ||
| } | ||
| } | ||
|
|
||
| if _, ok := updateFields["timeSkippingConfig.enabled"]; ok { | ||
| if mergeInto.TimeSkippingConfig == nil { | ||
| mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{} | ||
| } | ||
| mergeInto.TimeSkippingConfig.Enabled = mergeFrom.GetTimeSkippingConfig().GetEnabled() | ||
| } | ||
|
|
||
| if _, ok := updateFields["timeSkippingConfig.disablePropagation"]; ok { | ||
| if mergeInto.TimeSkippingConfig == nil { | ||
| mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{} | ||
| } | ||
| mergeInto.TimeSkippingConfig.DisablePropagation = mergeFrom.GetTimeSkippingConfig().GetDisablePropagation() | ||
| } | ||
|
|
||
| if _, ok := updateFields["timeSkippingConfig.maxSkippedDuration"]; ok { | ||
| if mergeInto.TimeSkippingConfig == nil { | ||
| mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{} | ||
| } | ||
| mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxSkippedDuration{ | ||
| MaxSkippedDuration: mergeFrom.GetTimeSkippingConfig().GetMaxSkippedDuration(), | ||
| } | ||
| } | ||
|
|
||
| if _, ok := updateFields["timeSkippingConfig.maxElapsedDuration"]; ok { | ||
| if mergeInto.TimeSkippingConfig == nil { | ||
| mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{} | ||
| } | ||
| mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxElapsedDuration{ | ||
| MaxElapsedDuration: mergeFrom.GetTimeSkippingConfig().GetMaxElapsedDuration(), | ||
| } | ||
| } | ||
|
|
||
| if _, ok := updateFields["timeSkippingConfig.maxTargetTime"]; ok { | ||
| if mergeInto.TimeSkippingConfig == nil { | ||
| mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{} | ||
| } | ||
| mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxTargetTime{ | ||
| MaxTargetTime: mergeFrom.GetTimeSkippingConfig().GetMaxTargetTime(), | ||
| } | ||
| } | ||
|
|
||
| return mergeInto, nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw better to fix this as well and return invalid argument error.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I already have another patch for this together with other patches for feature-1. no worries all patches needed are noted