From ce0ec33fc4e43d9487240297ff10255df827e85e Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Mon, 22 Aug 2022 22:34:48 -0700 Subject: [PATCH 01/12] Prefix sub-lp exec id with the parent exec-id Signed-off-by: Haytham Abuelfutuh --- pkg/apis/flyteworkflow/v1alpha1/iface.go | 14 ++++++- .../flyteworkflow/v1alpha1/node_status.go | 13 +++++++ pkg/controller/nodes/handler/state.go | 5 ++- .../nodes/handler/transition_info.go | 1 + pkg/controller/nodes/subworkflow/handler.go | 7 +++- .../nodes/subworkflow/launchplan.go | 39 ++++++++++++++++--- pkg/controller/nodes/subworkflow/util.go | 23 +++++++++++ pkg/controller/nodes/transformers.go | 1 + 8 files changed, 92 insertions(+), 11 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 26cfd2321..b71ebd925 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -236,8 +236,8 @@ type MutableDynamicNodeStatus interface { SetExecutionError(executionError *core.ExecutionError) } -// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus. -// p returns ExecutableBranchNodeStatus, which permits some mutations +// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the +// GetExecutionStatus. p returns ExecutableBranchNodeStatus, which permits some mutations type ExecutableBranchNode interface { GetIf() ExecutableIfBlock GetElse() *NodeID @@ -246,6 +246,7 @@ type ExecutableBranchNode interface { } type ExecutableWorkflowNodeStatus interface { + Versioned GetWorkflowNodePhase() WorkflowNodePhase GetExecutionError() *core.ExecutionError } @@ -253,6 +254,7 @@ type ExecutableWorkflowNodeStatus interface { type MutableWorkflowNodeStatus interface { Mutable ExecutableWorkflowNodeStatus + MutableVersioned SetWorkflowNodePhase(phase WorkflowNodePhase) SetExecutionError(executionError *core.ExecutionError) } @@ -261,6 +263,14 @@ type Mutable interface { IsDirty() bool } +type Versioned interface { + GetVersion() uint32 +} + +type MutableVersioned interface { + SetVersion(version uint32) +} + type MutableNodeStatus interface { Mutable // Mutation API's diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index cc78b492a..72a57fb6b 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -17,6 +17,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type VersionedStruct struct { + version uint32 +} + +func (in *VersionedStruct) SetVersion(version uint32) { + in.version = version +} + +func (in VersionedStruct) GetVersion() uint32 { + return in.version +} + type MutableStruct struct { isDirty bool } @@ -158,6 +170,7 @@ const ( type WorkflowNodeStatus struct { MutableStruct + VersionedStruct Phase WorkflowNodePhase `json:"phase,omitempty"` ExecutionError *core.ExecutionError `json:"executionError,omitempty"` } diff --git a/pkg/controller/nodes/handler/state.go b/pkg/controller/nodes/handler/state.go index 2e456d817..3512a6c25 100644 --- a/pkg/controller/nodes/handler/state.go +++ b/pkg/controller/nodes/handler/state.go @@ -35,8 +35,9 @@ type DynamicNodeState struct { } type WorkflowNodeState struct { - Phase v1alpha1.WorkflowNodePhase - Error *core.ExecutionError + Phase v1alpha1.WorkflowNodePhase + Error *core.ExecutionError + Version uint32 } type NodeStateWriter interface { diff --git a/pkg/controller/nodes/handler/transition_info.go b/pkg/controller/nodes/handler/transition_info.go index d12ff9f8a..d9c31a60c 100644 --- a/pkg/controller/nodes/handler/transition_info.go +++ b/pkg/controller/nodes/handler/transition_info.go @@ -39,6 +39,7 @@ type DynamicNodeInfo struct { type WorkflowNodeInfo struct { LaunchedWorkflowID *core.WorkflowExecutionIdentifier + Version uint32 } type BranchNodeInfo struct { diff --git a/pkg/controller/nodes/subworkflow/handler.go b/pkg/controller/nodes/subworkflow/handler.go index bf7b5c393..7fefcd880 100644 --- a/pkg/controller/nodes/subworkflow/handler.go +++ b/pkg/controller/nodes/subworkflow/handler.go @@ -58,7 +58,12 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecu return transition, err } - workflowNodeState := handler.WorkflowNodeState{Phase: newPhase} + version := uint32(0) + if info := transition.Info().GetInfo(); info != nil && info.WorkflowNodeInfo != nil { + version = info.WorkflowNodeInfo.Version + } + + workflowNodeState := handler.WorkflowNodeState{Phase: newPhase, Version: version} err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) if err != nil { logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error()) diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index 8d14968d0..983e10508 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -22,6 +22,13 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" ) +type NodeStatusVersion uint32 + +const ( + NodeStatusVersion1 NodeStatusVersion = iota + NodeStatusVersion2 +) + type launchPlanHandler struct { launchPlan launchplan.Executor recoveryClient recovery.Client @@ -56,10 +63,11 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No if err != nil { return handler.UnknownTransition, err } - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionIDV2( parentNodeExecutionID, nCtx.CurrentAttempt(), ) + if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil } @@ -106,19 +114,38 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{ - WorkflowNodeInfo: &handler.WorkflowNodeInfo{LaunchedWorkflowID: childID}, + WorkflowNodeInfo: &handler.WorkflowNodeInfo{ + LaunchedWorkflowID: childID, + Version: uint32(NodeStatusVersion2), + }, })), nil } +func GetChildWorkflowExecutionForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) { + // Handle launch plan + if nCtx.NodeStateReader().GetWorkflowNodeState().Version == uint32(NodeStatusVersion2) { + return GetChildWorkflowExecutionIDV2( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) + } + + return GetChildWorkflowExecutionID( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) +} + func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { parentNodeExecutionID, err := getParentNodeExecutionID(nCtx) if err != nil { return handler.UnknownTransition, err } + // Handle launch plan - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { @@ -203,9 +230,9 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeEx if err != nil { return err } - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { // THIS SHOULD NEVER HAPPEN diff --git a/pkg/controller/nodes/subworkflow/util.go b/pkg/controller/nodes/subworkflow/util.go index 0b5bf715b..cedef8469 100644 --- a/pkg/controller/nodes/subworkflow/util.go +++ b/pkg/controller/nodes/subworkflow/util.go @@ -22,3 +22,26 @@ func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attem Name: name, }, nil } + +func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) { + name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))) + if err != nil { + return nil, err + } + + // Restriction on name is 20 chars + return &core.WorkflowExecutionIdentifier{ + Project: nodeExecID.ExecutionId.Project, + Domain: nodeExecID.ExecutionId.Domain, + Name: EnsureExecIDWithinLength(nodeExecID.ExecutionId.Name, name, maxLengthForSubWorkflow), + }, nil +} + +func EnsureExecIDWithinLength(execID, subName string, maxLength int) string { + maxLengthRemaining := maxLength - len(subName) + if len(execID) < maxLengthRemaining { + return execID + subName + } + + return execID[:maxLengthRemaining] + subName +} diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index 68f0d70bb..94c13735d 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -252,5 +252,6 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa t := s.GetOrCreateWorkflowStatus() t.SetWorkflowNodePhase(n.w.Phase) t.SetExecutionError(n.w.Error) + t.SetVersion(p.GetInfo().WorkflowNodeInfo.Version) } } From e73a7aa3cc9f2cff4c45cc0205e84eaf8867df5e Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Mon, 22 Aug 2022 22:41:36 -0700 Subject: [PATCH 02/12] cleanup Signed-off-by: Haytham Abuelfutuh --- pkg/apis/flyteworkflow/v1alpha1/iface.go | 10 ---------- pkg/apis/flyteworkflow/v1alpha1/node_status.go | 13 ------------- 2 files changed, 23 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index b71ebd925..df1e42a20 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -246,7 +246,6 @@ type ExecutableBranchNode interface { } type ExecutableWorkflowNodeStatus interface { - Versioned GetWorkflowNodePhase() WorkflowNodePhase GetExecutionError() *core.ExecutionError } @@ -254,7 +253,6 @@ type ExecutableWorkflowNodeStatus interface { type MutableWorkflowNodeStatus interface { Mutable ExecutableWorkflowNodeStatus - MutableVersioned SetWorkflowNodePhase(phase WorkflowNodePhase) SetExecutionError(executionError *core.ExecutionError) } @@ -263,14 +261,6 @@ type Mutable interface { IsDirty() bool } -type Versioned interface { - GetVersion() uint32 -} - -type MutableVersioned interface { - SetVersion(version uint32) -} - type MutableNodeStatus interface { Mutable // Mutation API's diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 72a57fb6b..cc78b492a 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -17,18 +17,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type VersionedStruct struct { - version uint32 -} - -func (in *VersionedStruct) SetVersion(version uint32) { - in.version = version -} - -func (in VersionedStruct) GetVersion() uint32 { - return in.version -} - type MutableStruct struct { isDirty bool } @@ -170,7 +158,6 @@ const ( type WorkflowNodeStatus struct { MutableStruct - VersionedStruct Phase WorkflowNodePhase `json:"phase,omitempty"` ExecutionError *core.ExecutionError `json:"executionError,omitempty"` } From 4fb8fb1526edb08674a2c197a8e54e884f9bac56 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Mon, 22 Aug 2022 22:43:10 -0700 Subject: [PATCH 03/12] cleanup Signed-off-by: Haytham Abuelfutuh --- pkg/controller/nodes/transformers.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index 94c13735d..68f0d70bb 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -252,6 +252,5 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa t := s.GetOrCreateWorkflowStatus() t.SetWorkflowNodePhase(n.w.Phase) t.SetExecutionError(n.w.Error) - t.SetVersion(p.GetInfo().WorkflowNodeInfo.Version) } } From 312535facff274d89421105a265762c0535b1a75 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Tue, 23 Aug 2022 13:48:00 -0700 Subject: [PATCH 04/12] Use a CRD-level version instead Signed-off-by: Haytham Abuelfutuh --- pkg/apis/flyteworkflow/v1alpha1/iface.go | 1 + pkg/apis/flyteworkflow/v1alpha1/workflow.go | 22 +++++++++++++++++++ pkg/controller/handler.go | 11 ++++++++++ pkg/controller/nodes/handler/state.go | 5 ++--- .../nodes/handler/transition_info.go | 1 - pkg/controller/nodes/subworkflow/handler.go | 7 +----- .../nodes/subworkflow/launchplan.go | 3 +-- 7 files changed, 38 insertions(+), 12 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index df1e42a20..1ceddd36f 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -443,6 +443,7 @@ type Meta interface { GetSecurityContext() core.SecurityContext IsInterruptible() bool GetEventVersion() EventVersion + GetDefinitionVersion() WorkflowDefinitionVersion GetRawOutputDataConfig() RawOutputDataConfig } diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/pkg/apis/flyteworkflow/v1alpha1/workflow.go index d96a87d14..11e73a167 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -23,6 +23,15 @@ const ShardKeyspaceSize = 32 const StartNodeID = "start-node" const EndNodeID = "end-node" +type WorkflowDefinitionVersion uint32 + +var LatestWorkflowDefinitionVersion = WorkflowDefinitionVersion1 + +const ( + WorkflowDefinitionVersion0 WorkflowDefinitionVersion = iota + WorkflowDefinitionVersion1 +) + // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -82,12 +91,25 @@ func (in *FlyteWorkflow) GetEventVersion() EventVersion { return EventVersion0 } +func (in *FlyteWorkflow) GetDefinitionVersion() WorkflowDefinitionVersion { + if meta := in.WorkflowMeta; meta != nil && meta.DefinitionVersion != nil { + return *meta.DefinitionVersion + } + + return WorkflowDefinitionVersion0 +} + func (in *FlyteWorkflow) GetExecutionConfig() ExecutionConfig { return in.ExecutionConfig } type WorkflowMeta struct { EventVersion EventVersion `json:"eventVersion,omitempty"` + // DefinitionVersion allows propeller code that populates the CRD to evolve (in backward incompatible ways) without + // affecting in-flight executions. Once an execution starts, propeller will populate this field with the current or + // latest version. If a newer propeller version is deployed midway that comes with a newer version, code that relies + // on the latest version should be gated behind this. + DefinitionVersion *WorkflowDefinitionVersion `json:"defVersion,omitempty"` } type EventVersion int diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 95fd46cb1..837bf4d27 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -83,6 +83,16 @@ func (p *Propeller) Initialize(ctx context.Context) error { return p.workflowExecutor.Initialize(ctx) } +func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion) { + if wf.WorkflowMeta == nil { + wf.WorkflowMeta = &v1alpha1.WorkflowMeta{} + } + + if wf.WorkflowMeta.DefinitionVersion == nil { + wf.WorkflowMeta.DefinitionVersion = &version + } +} + // TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state. // The desired state here is the entire workflow is completed, actual state is each nodes current execution state. func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) { @@ -120,6 +130,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F if !mutableW.GetExecutionStatus().IsTerminated() { var err error SetFinalizerIfEmpty(mutableW, FinalizerKey) + SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) func() { t := p.metrics.RawWorkflowTraversalTime.Start(ctx) diff --git a/pkg/controller/nodes/handler/state.go b/pkg/controller/nodes/handler/state.go index 3512a6c25..2e456d817 100644 --- a/pkg/controller/nodes/handler/state.go +++ b/pkg/controller/nodes/handler/state.go @@ -35,9 +35,8 @@ type DynamicNodeState struct { } type WorkflowNodeState struct { - Phase v1alpha1.WorkflowNodePhase - Error *core.ExecutionError - Version uint32 + Phase v1alpha1.WorkflowNodePhase + Error *core.ExecutionError } type NodeStateWriter interface { diff --git a/pkg/controller/nodes/handler/transition_info.go b/pkg/controller/nodes/handler/transition_info.go index d9c31a60c..d12ff9f8a 100644 --- a/pkg/controller/nodes/handler/transition_info.go +++ b/pkg/controller/nodes/handler/transition_info.go @@ -39,7 +39,6 @@ type DynamicNodeInfo struct { type WorkflowNodeInfo struct { LaunchedWorkflowID *core.WorkflowExecutionIdentifier - Version uint32 } type BranchNodeInfo struct { diff --git a/pkg/controller/nodes/subworkflow/handler.go b/pkg/controller/nodes/subworkflow/handler.go index 7fefcd880..bf7b5c393 100644 --- a/pkg/controller/nodes/subworkflow/handler.go +++ b/pkg/controller/nodes/subworkflow/handler.go @@ -58,12 +58,7 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecu return transition, err } - version := uint32(0) - if info := transition.Info().GetInfo(); info != nil && info.WorkflowNodeInfo != nil { - version = info.WorkflowNodeInfo.Version - } - - workflowNodeState := handler.WorkflowNodeState{Phase: newPhase, Version: version} + workflowNodeState := handler.WorkflowNodeState{Phase: newPhase} err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) if err != nil { logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error()) diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index 983e10508..0aa83fe55 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -116,14 +116,13 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{ WorkflowNodeInfo: &handler.WorkflowNodeInfo{ LaunchedWorkflowID: childID, - Version: uint32(NodeStatusVersion2), }, })), nil } func GetChildWorkflowExecutionForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) { // Handle launch plan - if nCtx.NodeStateReader().GetWorkflowNodeState().Version == uint32(NodeStatusVersion2) { + if nCtx.ExecutionContext().GetDefinitionVersion() == v1alpha1.WorkflowDefinitionVersion1 { return GetChildWorkflowExecutionIDV2( parentNodeExecID, nCtx.CurrentAttempt(), From 8c7ce2dc809544fd31c0eaad735e56619fe3170d Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Tue, 23 Aug 2022 13:51:07 -0700 Subject: [PATCH 05/12] cleanup Signed-off-by: Haytham Abuelfutuh --- .../nodes/subworkflow/launchplan.go | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index 0aa83fe55..037c41b88 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -22,13 +22,6 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" ) -type NodeStatusVersion uint32 - -const ( - NodeStatusVersion1 NodeStatusVersion = iota - NodeStatusVersion2 -) - type launchPlanHandler struct { launchPlan launchplan.Executor recoveryClient recovery.Client @@ -63,9 +56,10 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No if err != nil { return handler.UnknownTransition, err } - childID, err := GetChildWorkflowExecutionIDV2( + + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { @@ -114,22 +108,20 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{ - WorkflowNodeInfo: &handler.WorkflowNodeInfo{ - LaunchedWorkflowID: childID, - }, + WorkflowNodeInfo: &handler.WorkflowNodeInfo{LaunchedWorkflowID: childID}, })), nil } -func GetChildWorkflowExecutionForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) { +func GetChildWorkflowExecutionIDForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) { // Handle launch plan - if nCtx.ExecutionContext().GetDefinitionVersion() == v1alpha1.WorkflowDefinitionVersion1 { - return GetChildWorkflowExecutionIDV2( + if nCtx.ExecutionContext().GetDefinitionVersion() == v1alpha1.WorkflowDefinitionVersion0 { + return GetChildWorkflowExecutionID( parentNodeExecID, nCtx.CurrentAttempt(), ) } - return GetChildWorkflowExecutionID( + return GetChildWorkflowExecutionIDV2( parentNodeExecID, nCtx.CurrentAttempt(), ) @@ -142,7 +134,7 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand } // Handle launch plan - childID, err := GetChildWorkflowExecutionForExecution( + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, nCtx, ) @@ -229,7 +221,7 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeEx if err != nil { return err } - childID, err := GetChildWorkflowExecutionForExecution( + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, nCtx, ) From 7eb032764d9f164ee9e21185921c2e80465f0140 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Tue, 23 Aug 2022 14:11:46 -0700 Subject: [PATCH 06/12] Use fnv64 to create a hash for child workflow executions Signed-off-by: Haytham Abuelfutuh --- cmd/kubectl-flyte/cmd/printers/node.go | 2 +- go.mod | 2 ++ go.sum | 4 ++-- pkg/controller/nodes/common/utils.go | 7 ++++--- .../nodes/dynamic/dynamic_workflow_test.go | 8 ++++---- pkg/controller/nodes/dynamic/utils.go | 2 +- pkg/controller/nodes/subworkflow/util.go | 17 +++++------------ pkg/controller/nodes/task/taskexec_context.go | 2 +- 8 files changed, 20 insertions(+), 24 deletions(-) diff --git a/cmd/kubectl-flyte/cmd/printers/node.go b/cmd/kubectl-flyte/cmd/printers/node.go index 237df5bea..ae00fb411 100644 --- a/cmd/kubectl-flyte/cmd/printers/node.go +++ b/cmd/kubectl-flyte/cmd/printers/node.go @@ -58,7 +58,7 @@ func (p NodeStatusPrinter) BaseNodeInfo(node v1alpha1.BaseNode, nodeStatus v1alp } func (p NodeStatusPrinter) NodeInfo(wName string, node v1alpha1.BaseNode, nodeStatus v1alpha1.ExecutableNodeStatus) []string { - resourceName, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, wName, node.GetID(), strconv.Itoa(int(nodeStatus.GetAttempts()))) + resourceName, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, []string{wName, node.GetID(), strconv.Itoa(int(nodeStatus.GetAttempts()))}) if err != nil { resourceName = "na" } diff --git a/go.mod b/go.mod index 2bb15e190..750b53ccd 100644 --- a/go.mod +++ b/go.mod @@ -146,3 +146,5 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d + +replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.11-0.20220823042055-ccaa0ca86e7a diff --git a/go.sum b/go.sum index 4621e7bdf..a71b51acd 100644 --- a/go.sum +++ b/go.sum @@ -294,8 +294,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.1.10 h1:Bus/JUto0oBTjAS4EBN7EITeuZNS4naq+uFpj+ydaW4= github.com/flyteorg/flyteidl v1.1.10/go.mod h1:f1tvw5CDjqmrzNxKpRYr6BdAhHL8f7Wp1Duxl0ZOV4g= -github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4= -github.com/flyteorg/flyteplugins v1.0.10/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= +github.com/flyteorg/flyteplugins v1.0.11-0.20220823042055-ccaa0ca86e7a h1:zVh/pI7G0QF+85+fAC2TwWaXhNCzgJevTxwOpPDS/qk= +github.com/flyteorg/flyteplugins v1.0.11-0.20220823042055-ccaa0ca86e7a/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= github.com/flyteorg/flytestdlib v1.0.5 h1:80A/vfpAJl+pgU6vxccbsYApZPrvyGhOIsCAFngsjnk= github.com/flyteorg/flytestdlib v1.0.5/go.mod h1:WTe0k3DmmrKFjj3hwiIbjjdCK89X63MBzBbXhQ4Yxf0= diff --git a/pkg/controller/nodes/common/utils.go b/pkg/controller/nodes/common/utils.go index af5a25091..e6a5fb6fd 100644 --- a/pkg/controller/nodes/common/utils.go +++ b/pkg/controller/nodes/common/utils.go @@ -11,7 +11,7 @@ import ( const maxUniqueIDLength = 20 -// The UniqueId of a node is unique within a given workflow execution. +// GenerateUniqueID is the UniqueId of a node is unique within a given workflow execution. // In order to achieve that we track the lineage of the node. // To compute the uniqueID of a node, we use the uniqueID and retry attempt of the parent node // For nodes in level 0, there is no parent, and parentInfo is nil @@ -24,10 +24,11 @@ func GenerateUniqueID(parentInfo executors.ImmutableParentInfo, nodeID string) ( parentRetryAttempt = strconv.Itoa(int(parentInfo.CurrentAttempt())) } - return encoding.FixedLengthUniqueIDForParts(maxUniqueIDLength, parentUniqueID, parentRetryAttempt, nodeID) + return encoding.FixedLengthUniqueIDForParts(maxUniqueIDLength, []string{parentUniqueID, parentRetryAttempt, nodeID}) } -// When creating parentInfo, the unique id of parent is dependent on the unique id and the current attempt of the grand parent to track the lineage. +// CreateParentInfo creates a unique parent id, the unique id of parent is dependent on the unique id and the current +// attempt of the grandparent to track the lineage. func CreateParentInfo(grandParentInfo executors.ImmutableParentInfo, nodeID string, parentAttempt uint32) (executors.ImmutableParentInfo, error) { uniqueID, err := GenerateUniqueID(grandParentInfo, nodeID) if err != nil { diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index 4fbf42f6e..184b0ee6c 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -207,7 +207,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.NotNil(t, dCtx.subWorkflowClosure) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) - expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") + expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"}) assert.Nil(t, err) assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) @@ -276,7 +276,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.NotNil(t, dCtx.subWorkflowClosure) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) - expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "", "", "n1") + expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"", "", "n1"}) assert.Nil(t, err) assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) @@ -430,7 +430,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.NotNil(t, dCtx.subWorkflow) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) - expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") + expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"}) assert.Nil(t, err) assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) @@ -575,7 +575,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t assert.NotNil(t, dCtx.subWorkflow) assert.NotNil(t, dCtx.execContext) assert.NotNil(t, dCtx.execContext.GetParentInfo()) - expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, "c1", "2", "n1") + expectedParentUniqueID, err := encoding.FixedLengthUniqueIDForParts(20, []string{"c1", "2", "n1"}) assert.Nil(t, err) assert.Equal(t, expectedParentUniqueID, dCtx.execContext.GetParentInfo().GetUniqueID()) assert.Equal(t, uint32(1), dCtx.execContext.GetParentInfo().CurrentAttempt()) diff --git a/pkg/controller/nodes/dynamic/utils.go b/pkg/controller/nodes/dynamic/utils.go index dea537178..d08845856 100644 --- a/pkg/controller/nodes/dynamic/utils.go +++ b/pkg/controller/nodes/dynamic/utils.go @@ -28,7 +28,7 @@ func underlyingInterface(ctx context.Context, taskReader handler.TaskReader) (*c } func hierarchicalNodeID(parentNodeID, retryAttempt, nodeID string) (string, error) { - return encoding.FixedLengthUniqueIDForParts(20, parentNodeID, retryAttempt, nodeID) + return encoding.FixedLengthUniqueIDForParts(20, []string{parentNodeID, retryAttempt, nodeID}) } func updateBindingNodeIDsWithLineage(parentNodeID, retryAttempt string, binding *core.BindingData) (err error) { diff --git a/pkg/controller/nodes/subworkflow/util.go b/pkg/controller/nodes/subworkflow/util.go index cedef8469..9eed0c8a5 100644 --- a/pkg/controller/nodes/subworkflow/util.go +++ b/pkg/controller/nodes/subworkflow/util.go @@ -11,10 +11,11 @@ import ( const maxLengthForSubWorkflow = 20 func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) { - name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))) + name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, []string{nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))}) if err != nil { return nil, err } + // Restriction on name is 20 chars return &core.WorkflowExecutionIdentifier{ Project: nodeExecID.ExecutionId.Project, @@ -24,7 +25,8 @@ func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attem } func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) { - name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))) + name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, []string{nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))}, + encoding.NewAlgorithmOption(encoding.Algorithm64)) if err != nil { return nil, err } @@ -33,15 +35,6 @@ func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, att return &core.WorkflowExecutionIdentifier{ Project: nodeExecID.ExecutionId.Project, Domain: nodeExecID.ExecutionId.Domain, - Name: EnsureExecIDWithinLength(nodeExecID.ExecutionId.Name, name, maxLengthForSubWorkflow), + Name: name, }, nil } - -func EnsureExecIDWithinLength(execID, subName string, maxLength int) string { - maxLengthRemaining := maxLength - len(subName) - if len(execID) < maxLengthRemaining { - return execID + subName - } - - return execID[:maxLengthRemaining] + subName -} diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index ceb5849d6..8a81fdf60 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -204,7 +204,7 @@ func convertTaskResourcesToRequirements(taskResources v1alpha1.TaskResources) *v // access to this location and can be passed in per execution. // the function also returns the uniqueID generated func ComputeRawOutputPrefix(ctx context.Context, length int, nCtx handler.NodeExecutionContext, currentNodeUniqueID v1alpha1.NodeID, currentAttempt uint32) (io.RawOutputPaths, string, error) { - uniqueID, err := encoding.FixedLengthUniqueIDForParts(length, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(currentAttempt))) + uniqueID, err := encoding.FixedLengthUniqueIDForParts(length, []string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(currentAttempt))}) if err != nil { // SHOULD never really happen return nil, uniqueID, err From 589b1f7745fbe1ae758fe0a6b9b37bc068a2401d Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Wed, 31 Aug 2022 12:52:38 -0700 Subject: [PATCH 07/12] Move DefinitionVersion to Status of the CRD Signed-off-by: Haytham Abuelfutuh --- pkg/apis/flyteworkflow/v1alpha1/workflow.go | 9 ++------- pkg/apis/flyteworkflow/v1alpha1/workflow_status.go | 6 ++++++ pkg/controller/config/config.go | 2 +- pkg/controller/handler.go | 10 +++------- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 11e73a167..4aac6ef0f 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -92,8 +92,8 @@ func (in *FlyteWorkflow) GetEventVersion() EventVersion { } func (in *FlyteWorkflow) GetDefinitionVersion() WorkflowDefinitionVersion { - if meta := in.WorkflowMeta; meta != nil && meta.DefinitionVersion != nil { - return *meta.DefinitionVersion + if in.Status.DefinitionVersion != nil { + return *in.Status.DefinitionVersion } return WorkflowDefinitionVersion0 @@ -105,11 +105,6 @@ func (in *FlyteWorkflow) GetExecutionConfig() ExecutionConfig { type WorkflowMeta struct { EventVersion EventVersion `json:"eventVersion,omitempty"` - // DefinitionVersion allows propeller code that populates the CRD to evolve (in backward incompatible ways) without - // affecting in-flight executions. Once an execution starts, propeller will populate this field with the current or - // latest version. If a newer propeller version is deployed midway that comes with a newer version, code that relies - // on the latest version should be gated behind this. - DefinitionVersion *WorkflowDefinitionVersion `json:"defVersion,omitempty"` } type EventVersion int diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go b/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go index 7abc6efca..e4cae0306 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow_status.go @@ -35,6 +35,12 @@ type WorkflowStatus struct { // Stores the Error during the Execution of the Workflow. It is optional and usually associated with Failing/Failed state only Error *ExecutionError `json:"error,omitempty"` + // DefinitionVersion allows propeller code that populates the CRD to evolve (in backward incompatible ways) without + // affecting in-flight executions. Once an execution starts, propeller will populate this field with the current or + // latest version. If a newer propeller version is deployed midway that comes with a newer version, code that relies + // on the latest version should be gated behind this. + DefinitionVersion *WorkflowDefinitionVersion `json:"defVersion,omitempty"` + // non-Serialized fields DataReferenceConstructor storage.ReferenceConstructor `json:"-"` } diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 20e3f90d4..1265ba9f7 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -124,7 +124,7 @@ type Config struct { MasterURL string `json:"master"` Workers int `json:"workers" pflag:",Number of threads to process workflows"` WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:",Frequency of re-evaluating workflows"` - DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:",Frequency of re-evaluating downstream tasks"` + DownstreamEval config.Duration `json:"downstream-eval-duration" pflakg:",Frequency of re-evaluating downstream tasks"` LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller"` ProfilerPort config.Port `json:"prof-port" pflag:",Profiler port"` MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 837bf4d27..ecd971ffd 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -78,18 +78,14 @@ type Propeller struct { cfg *config.Config } -// Initializes all downstream executors +// Initialize initializes all downstream executors func (p *Propeller) Initialize(ctx context.Context) error { return p.workflowExecutor.Initialize(ctx) } func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion) { - if wf.WorkflowMeta == nil { - wf.WorkflowMeta = &v1alpha1.WorkflowMeta{} - } - - if wf.WorkflowMeta.DefinitionVersion == nil { - wf.WorkflowMeta.DefinitionVersion = &version + if wf.Status.DefinitionVersion == nil { + wf.Status.DefinitionVersion = &version } } From 289e0a9df0a630022e6fef7cc47b70ec7f2b60ff Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Thu, 8 Sep 2022 15:56:47 -0700 Subject: [PATCH 08/12] Update to the released flyteplugins Signed-off-by: Haytham Abuelfutuh --- go.mod | 4 +--- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 750b53ccd..6c15363b5 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 github.com/flyteorg/flyteidl v1.1.10 - github.com/flyteorg/flyteplugins v1.0.10 + github.com/flyteorg/flyteplugins v1.0.13 github.com/flyteorg/flytestdlib v1.0.5 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible @@ -146,5 +146,3 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d - -replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.11-0.20220823042055-ccaa0ca86e7a diff --git a/go.sum b/go.sum index a71b51acd..8be4efbec 100644 --- a/go.sum +++ b/go.sum @@ -294,8 +294,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.1.10 h1:Bus/JUto0oBTjAS4EBN7EITeuZNS4naq+uFpj+ydaW4= github.com/flyteorg/flyteidl v1.1.10/go.mod h1:f1tvw5CDjqmrzNxKpRYr6BdAhHL8f7Wp1Duxl0ZOV4g= -github.com/flyteorg/flyteplugins v1.0.11-0.20220823042055-ccaa0ca86e7a h1:zVh/pI7G0QF+85+fAC2TwWaXhNCzgJevTxwOpPDS/qk= -github.com/flyteorg/flyteplugins v1.0.11-0.20220823042055-ccaa0ca86e7a/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= +github.com/flyteorg/flyteplugins v1.0.13 h1:mNGImGSdGsYUjmB9vUzZAWqh/h7FCH+MyMRPS78z6Z0= +github.com/flyteorg/flyteplugins v1.0.13/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= github.com/flyteorg/flytestdlib v1.0.5 h1:80A/vfpAJl+pgU6vxccbsYApZPrvyGhOIsCAFngsjnk= github.com/flyteorg/flytestdlib v1.0.5/go.mod h1:WTe0k3DmmrKFjj3hwiIbjjdCK89X63MBzBbXhQ4Yxf0= From 7b5f829c2385f7326c6b624196edd7e8c1d4a995 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Thu, 8 Sep 2022 16:06:24 -0700 Subject: [PATCH 09/12] Regenerate Signed-off-by: Haytham Abuelfutuh --- .../v1alpha1/mocks/ExecutableWorkflow.go | 32 +++++++++++++++++++ pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go | 32 +++++++++++++++++++ .../v1alpha1/mocks/MetaExtended.go | 32 +++++++++++++++++++ pkg/controller/config/config.go | 2 +- .../executors/mocks/execution_context.go | 32 +++++++++++++++++++ .../mocks/immutable_execution_context.go | 32 +++++++++++++++++++ 6 files changed, 161 insertions(+), 1 deletion(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go index 8a582f63e..42c8e694b 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go @@ -195,6 +195,38 @@ func (_m *ExecutableWorkflow) GetCreationTimestamp() v1.Time { return r0 } +type ExecutableWorkflow_GetDefinitionVersion struct { + *mock.Call +} + +func (_m ExecutableWorkflow_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *ExecutableWorkflow_GetDefinitionVersion { + return &ExecutableWorkflow_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableWorkflow) OnGetDefinitionVersion() *ExecutableWorkflow_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &ExecutableWorkflow_GetDefinitionVersion{Call: c_call} +} + +func (_m *ExecutableWorkflow) OnGetDefinitionVersionMatch(matchers ...interface{}) *ExecutableWorkflow_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &ExecutableWorkflow_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *ExecutableWorkflow) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type ExecutableWorkflow_GetEventVersion struct { *mock.Call } diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go index 4afbcae33..4d38f70c9 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go @@ -84,6 +84,38 @@ func (_m *Meta) GetCreationTimestamp() v1.Time { return r0 } +type Meta_GetDefinitionVersion struct { + *mock.Call +} + +func (_m Meta_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *Meta_GetDefinitionVersion { + return &Meta_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *Meta) OnGetDefinitionVersion() *Meta_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &Meta_GetDefinitionVersion{Call: c_call} +} + +func (_m *Meta) OnGetDefinitionVersionMatch(matchers ...interface{}) *Meta_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &Meta_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *Meta) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type Meta_GetEventVersion struct { *mock.Call } diff --git a/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go b/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go index e3f395bc3..5e0a048ad 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go +++ b/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go @@ -118,6 +118,38 @@ func (_m *MetaExtended) GetCreationTimestamp() v1.Time { return r0 } +type MetaExtended_GetDefinitionVersion struct { + *mock.Call +} + +func (_m MetaExtended_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *MetaExtended_GetDefinitionVersion { + return &MetaExtended_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *MetaExtended) OnGetDefinitionVersion() *MetaExtended_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &MetaExtended_GetDefinitionVersion{Call: c_call} +} + +func (_m *MetaExtended) OnGetDefinitionVersionMatch(matchers ...interface{}) *MetaExtended_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &MetaExtended_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *MetaExtended) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type MetaExtended_GetEventVersion struct { *mock.Call } diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 1265ba9f7..20e3f90d4 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -124,7 +124,7 @@ type Config struct { MasterURL string `json:"master"` Workers int `json:"workers" pflag:",Number of threads to process workflows"` WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:",Frequency of re-evaluating workflows"` - DownstreamEval config.Duration `json:"downstream-eval-duration" pflakg:",Frequency of re-evaluating downstream tasks"` + DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:",Frequency of re-evaluating downstream tasks"` LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller"` ProfilerPort config.Port `json:"prof-port" pflag:",Profiler port"` MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` diff --git a/pkg/controller/executors/mocks/execution_context.go b/pkg/controller/executors/mocks/execution_context.go index a949ed4dc..cc919f641 100644 --- a/pkg/controller/executors/mocks/execution_context.go +++ b/pkg/controller/executors/mocks/execution_context.go @@ -152,6 +152,38 @@ func (_m *ExecutionContext) GetCreationTimestamp() v1.Time { return r0 } +type ExecutionContext_GetDefinitionVersion struct { + *mock.Call +} + +func (_m ExecutionContext_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *ExecutionContext_GetDefinitionVersion { + return &ExecutionContext_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutionContext) OnGetDefinitionVersion() *ExecutionContext_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &ExecutionContext_GetDefinitionVersion{Call: c_call} +} + +func (_m *ExecutionContext) OnGetDefinitionVersionMatch(matchers ...interface{}) *ExecutionContext_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &ExecutionContext_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *ExecutionContext) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type ExecutionContext_GetEventVersion struct { *mock.Call } diff --git a/pkg/controller/executors/mocks/immutable_execution_context.go b/pkg/controller/executors/mocks/immutable_execution_context.go index 6c899c811..1b4dfb6d0 100644 --- a/pkg/controller/executors/mocks/immutable_execution_context.go +++ b/pkg/controller/executors/mocks/immutable_execution_context.go @@ -85,6 +85,38 @@ func (_m *ImmutableExecutionContext) GetCreationTimestamp() v1.Time { return r0 } +type ImmutableExecutionContext_GetDefinitionVersion struct { + *mock.Call +} + +func (_m ImmutableExecutionContext_GetDefinitionVersion) Return(_a0 v1alpha1.WorkflowDefinitionVersion) *ImmutableExecutionContext_GetDefinitionVersion { + return &ImmutableExecutionContext_GetDefinitionVersion{Call: _m.Call.Return(_a0)} +} + +func (_m *ImmutableExecutionContext) OnGetDefinitionVersion() *ImmutableExecutionContext_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion") + return &ImmutableExecutionContext_GetDefinitionVersion{Call: c_call} +} + +func (_m *ImmutableExecutionContext) OnGetDefinitionVersionMatch(matchers ...interface{}) *ImmutableExecutionContext_GetDefinitionVersion { + c_call := _m.On("GetDefinitionVersion", matchers...) + return &ImmutableExecutionContext_GetDefinitionVersion{Call: c_call} +} + +// GetDefinitionVersion provides a mock function with given fields: +func (_m *ImmutableExecutionContext) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + ret := _m.Called() + + var r0 v1alpha1.WorkflowDefinitionVersion + if rf, ok := ret.Get(0).(func() v1alpha1.WorkflowDefinitionVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.WorkflowDefinitionVersion) + } + + return r0 +} + type ImmutableExecutionContext_GetEventVersion struct { *mock.Call } From c30c0c1f9243b13922da9dc984e8a6f9e50f6626 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Thu, 8 Sep 2022 16:11:12 -0700 Subject: [PATCH 10/12] fix unit tests Signed-off-by: Haytham Abuelfutuh --- pkg/controller/nodes/resolve_test.go | 4 ++++ pkg/controller/nodes/subworkflow/handler_test.go | 7 ++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/controller/nodes/resolve_test.go b/pkg/controller/nodes/resolve_test.go index 9069a1035..533e0ebfd 100644 --- a/pkg/controller/nodes/resolve_test.go +++ b/pkg/controller/nodes/resolve_test.go @@ -34,6 +34,10 @@ type dummyBaseWorkflow struct { Interruptible bool } +func (d *dummyBaseWorkflow) GetDefinitionVersion() v1alpha1.WorkflowDefinitionVersion { + return v1alpha1.WorkflowDefinitionVersion1 +} + func (d *dummyBaseWorkflow) GetParentInfo() executors.ImmutableParentInfo { return nil } diff --git a/pkg/controller/nodes/subworkflow/handler_test.go b/pkg/controller/nodes/subworkflow/handler_test.go index 13a997a7c..de95f41e2 100644 --- a/pkg/controller/nodes/subworkflow/handler_test.go +++ b/pkg/controller/nodes/subworkflow/handler_test.go @@ -156,7 +156,6 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { recoveryClient := &mocks5.Client{} t.Run("happy v0", func(t *testing.T) { - mockLPExec := &mocks.Executor{} h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnLaunchMatch( @@ -173,10 +172,11 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { ).Return(nil) nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseUndefined, mockNode, mockNodeStatus) + c := nCtx.ExecutionContext().(*execMocks.ExecutionContext) + c.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) s, err := h.Handle(ctx, nCtx) assert.NoError(t, err) assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase()) - c := nCtx.ExecutionContext().(*execMocks.ExecutionContext) c.AssertCalled(t, "IncrementParallelism") }) @@ -198,10 +198,11 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { ).Return(nil) nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseUndefined, mockNode, mockNodeStatus) + c := nCtx.ExecutionContext().(*execMocks.ExecutionContext) + c.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) s, err := h.Handle(ctx, nCtx) assert.NoError(t, err) assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase()) - c := nCtx.ExecutionContext().(*execMocks.ExecutionContext) c.AssertCalled(t, "IncrementParallelism") }) } From 17fa6e2f4312537add1cda1d5045495f3e6df1fb Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Thu, 8 Sep 2022 20:22:25 -0700 Subject: [PATCH 11/12] more fixes Signed-off-by: Haytham Abuelfutuh --- pkg/controller/nodes/subworkflow/handler_test.go | 10 ++++++---- pkg/controller/nodes/subworkflow/launchplan_test.go | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/controller/nodes/subworkflow/handler_test.go b/pkg/controller/nodes/subworkflow/handler_test.go index de95f41e2..17d9ee2d7 100644 --- a/pkg/controller/nodes/subworkflow/handler_test.go +++ b/pkg/controller/nodes/subworkflow/handler_test.go @@ -113,6 +113,7 @@ func createNodeContextWithVersion(phase v1alpha1.WorkflowNodePhase, n v1alpha1.E ex.OnGetAnnotations().Return(nil) ex.OnGetLabels().Return(nil) ex.OnGetRawOutputDataConfig().Return(v1alpha1.RawOutputDataConfig{}) + ex.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) nCtx.OnExecutionContext().Return(ex) @@ -172,11 +173,10 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { ).Return(nil) nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseUndefined, mockNode, mockNodeStatus) - c := nCtx.ExecutionContext().(*execMocks.ExecutionContext) - c.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) s, err := h.Handle(ctx, nCtx) assert.NoError(t, err) assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase()) + c := nCtx.ExecutionContext().(*execMocks.ExecutionContext) c.AssertCalled(t, "IncrementParallelism") }) @@ -198,11 +198,10 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { ).Return(nil) nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseUndefined, mockNode, mockNodeStatus) - c := nCtx.ExecutionContext().(*execMocks.ExecutionContext) - c.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) s, err := h.Handle(ctx, nCtx) assert.NoError(t, err) assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase()) + c := nCtx.ExecutionContext().(*execMocks.ExecutionContext) c.AssertCalled(t, "IncrementParallelism") }) } @@ -322,6 +321,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { ).Return(nil) eCtx := &execMocks.ExecutionContext{} + eCtx.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) nCtx.OnExecutionContext().Return(eCtx) eCtx.OnGetName().Return("test") err := h.Abort(ctx, nCtx, "test") @@ -343,6 +343,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { ).Return(nil) eCtx := &execMocks.ExecutionContext{} + eCtx.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) nCtx.OnExecutionContext().Return(eCtx) eCtx.OnGetName().Return("test") err := h.Abort(ctx, nCtx, "test") @@ -363,6 +364,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus) eCtx := &execMocks.ExecutionContext{} + eCtx.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) nCtx.OnExecutionContext().Return(eCtx) eCtx.OnGetName().Return("test") diff --git a/pkg/controller/nodes/subworkflow/launchplan_test.go b/pkg/controller/nodes/subworkflow/launchplan_test.go index 6022ccf2e..4c8f6806e 100644 --- a/pkg/controller/nodes/subworkflow/launchplan_test.go +++ b/pkg/controller/nodes/subworkflow/launchplan_test.go @@ -67,7 +67,6 @@ func TestSubWorkflowHandler_StartLaunchPlan(t *testing.T) { mockNodeStatus.On("GetAttempts").Return(attempts) t.Run("happy", func(t *testing.T) { - mockLPExec := &mocks.Executor{} h := launchPlanHandler{ @@ -248,6 +247,7 @@ func TestSubWorkflowHandler_StartLaunchPlan(t *testing.T) { nCtx.OnNodeExecutionMetadata().Return(nm) ectx := &execMocks.ExecutionContext{} + ectx.OnGetDefinitionVersion().Return(v1alpha1.WorkflowDefinitionVersion1) ectx.OnGetEventVersion().Return(1) ectx.OnGetParentInfo().Return(nil) ectx.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{ From cf6e615bf096ef553973b73347faf1158edcfca6 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Thu, 8 Sep 2022 21:20:26 -0700 Subject: [PATCH 12/12] fix old unit test Signed-off-by: Haytham Abuelfutuh --- cmd/kubectl-flyte/cmd/string_map_value.go | 2 +- cmd/kubectl-flyte/cmd/string_map_value_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/kubectl-flyte/cmd/string_map_value.go b/cmd/kubectl-flyte/cmd/string_map_value.go index 6f067ab20..b0e7e1380 100644 --- a/cmd/kubectl-flyte/cmd/string_map_value.go +++ b/cmd/kubectl-flyte/cmd/string_map_value.go @@ -22,7 +22,7 @@ func newStringMapValue() *stringMapValue { var entryRegex = regexp.MustCompile("(?P[^,]+)=(?P[^,]+)") -// Parses val into a map. Accepted format: a=1,b=2 +// Set parses val into a map. Accepted format: a=1,b=2 func (s *stringMapValue) Set(val string) error { matches := entryRegex.FindAllStringSubmatch(val, -1) out := make(map[string]string, len(matches)) diff --git a/cmd/kubectl-flyte/cmd/string_map_value_test.go b/cmd/kubectl-flyte/cmd/string_map_value_test.go index ff3ae8d17..1c2ed2873 100644 --- a/cmd/kubectl-flyte/cmd/string_map_value_test.go +++ b/cmd/kubectl-flyte/cmd/string_map_value_test.go @@ -23,7 +23,7 @@ func formatArg(values map[string]string) string { func randSpaces() string { res := "" - for cnt := rand.Int() % 10; cnt > 0; cnt-- { // nolint: gas + for cnt := rand.Int()%10 + 1; cnt > 0; cnt-- { // nolint: gas res += " " }