-
Notifications
You must be signed in to change notification settings - Fork 286
Versioning/upgrade on can #2171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
9ea25b6
e7d8ece
f431242
543ec18
ed2b1d6
e2eacaf
5eb988c
d86c4ea
469dc51
be0c1e6
82b1c22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,6 +193,10 @@ type ( | |
| // compatible build ID or not. See VersioningIntent. | ||
| VersioningIntent VersioningIntent | ||
|
|
||
| // InitialVersioningBehavior specifies the versioning behavior that the first task of the new run should use. | ||
| // For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version of the previous run. | ||
| InitialVersioningBehavior ContinueAsNewVersioningBehavior | ||
|
|
||
| // This is by default nil but may be overridden using NewContinueAsNewErrorWithOptions. | ||
| // It specifies the retry policy which gets carried over to the next run. | ||
| // If not set, the current workflow's retry policy will be carried over automatically. | ||
|
|
@@ -212,6 +216,10 @@ type ( | |
| // RetryPolicy specifies the retry policy to be used for the next run. | ||
| // If nil, the current workflow's retry policy will be used. | ||
| RetryPolicy *RetryPolicy | ||
|
|
||
| // InitialVersioningBehavior specifies the versioning behavior that the first task of the new run should use. | ||
| // For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version of the previous run. | ||
| InitialVersioningBehavior ContinueAsNewVersioningBehavior | ||
| } | ||
|
|
||
| // UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist | ||
|
|
@@ -555,6 +563,9 @@ func NewContinueAsNewErrorWithOptions(ctx Context, options ContinueAsNewErrorOpt | |
| if options.RetryPolicy != nil { | ||
| continueAsNewErr.RetryPolicy = options.RetryPolicy | ||
| } | ||
| if options.InitialVersioningBehavior != ContinueAsNewVersioningBehaviorUnspecified { | ||
|
||
| continueAsNewErr.InitialVersioningBehavior = options.InitialVersioningBehavior | ||
| } | ||
| } | ||
|
|
||
| return err | ||
|
|
@@ -582,15 +593,16 @@ func (wc *workflowEnvironmentInterceptor) NewContinueAsNewError( | |
| } | ||
|
|
||
| return &ContinueAsNewError{ | ||
| WorkflowType: workflowType, | ||
| Input: input, | ||
| Header: header, | ||
| TaskQueueName: options.TaskQueueName, | ||
| WorkflowExecutionTimeout: options.WorkflowExecutionTimeout, | ||
| WorkflowRunTimeout: options.WorkflowRunTimeout, | ||
| WorkflowTaskTimeout: options.WorkflowTaskTimeout, | ||
| VersioningIntent: options.VersioningIntent, | ||
| RetryPolicy: nil, // The retry policy can't be propagated like other options due to #676. | ||
| WorkflowType: workflowType, | ||
| Input: input, | ||
| Header: header, | ||
| TaskQueueName: options.TaskQueueName, | ||
| WorkflowExecutionTimeout: options.WorkflowExecutionTimeout, | ||
| WorkflowRunTimeout: options.WorkflowRunTimeout, | ||
| WorkflowTaskTimeout: options.WorkflowTaskTimeout, | ||
| VersioningIntent: options.VersioningIntent, | ||
| InitialVersioningBehavior: options.InitialVersioningBehavior, | ||
| RetryPolicy: nil, // The retry policy can't be propagated like other options due to #676. | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1207,6 +1207,9 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( | |
| // Update workflow info fields | ||
| weh.workflowInfo.currentHistoryLength = int(event.EventId) | ||
| weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew() | ||
| weh.workflowInfo.continueAsNewSuggestedReasons = convertContinueAsNewSuggestedReasonsFromProto( | ||
| event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNewReasons(), | ||
| ) | ||
| weh.workflowInfo.currentHistorySize = int(event.GetWorkflowTaskStartedEventAttributes().GetHistorySizeBytes()) | ||
| // Reset the counter on command helper used for generating ID for commands | ||
| weh.commandsHelper.setCurrentWorkflowTaskStartedEventID(event.GetEventId()) | ||
|
|
@@ -2162,3 +2165,13 @@ func (weh *workflowExecutionEventHandlerImpl) protocolConstructorForMessage( | |
| } | ||
| return nil, fmt.Errorf("unsupported protocol: %v", protoName) | ||
| } | ||
|
|
||
| func convertContinueAsNewSuggestedReasonsFromProto( | ||
| reasons []enumspb.SuggestContinueAsNewReason, | ||
| ) []ContinueAsNewSuggestedReason { | ||
| var converted []ContinueAsNewSuggestedReason | ||
|
||
| for _, reason := range reasons { | ||
| converted = append(converted, ContinueAsNewSuggestedReason(reason)) | ||
| } | ||
| return converted | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be marked experimental IMO