diff --git a/flyteadmin/pkg/common/mocks/storage.go b/flyteadmin/pkg/common/mocks/storage.go index 7e91bf0485..ca682c321e 100644 --- a/flyteadmin/pkg/common/mocks/storage.go +++ b/flyteadmin/pkg/common/mocks/storage.go @@ -83,6 +83,10 @@ func (t *TestDataStore) Delete(ctx context.Context, reference storage.DataRefere return t.DeleteCb(ctx, reference) } +func (t *TestDataStore) DeepCopyReferenceConstructor() storage.ReferenceConstructor { + return nil +} + func GetMockStorageClient() *storage.DataStore { mockStorageClient := TestDataStore{ Store: make(map[storage.DataReference][]byte), diff --git a/flyteadmin/pkg/workflowengine/impl/prepare_execution.go b/flyteadmin/pkg/workflowengine/impl/prepare_execution.go index 91642599a6..b4b47cb861 100644 --- a/flyteadmin/pkg/workflowengine/impl/prepare_execution.go +++ b/flyteadmin/pkg/workflowengine/impl/prepare_execution.go @@ -28,7 +28,7 @@ func addPermissions(securityCtx *core.SecurityContext, roleNameKey string, flyte if securityCtx == nil || securityCtx.RunAs == nil { return } - flyteWf.SecurityContext = *securityCtx + flyteWf.SecurityContext = v1alpha1.SecurityContext{SecurityContext: *securityCtx} if len(securityCtx.RunAs.IamRole) > 0 { if flyteWf.Annotations == nil { flyteWf.Annotations = map[string]string{} diff --git a/flytepropeller/Makefile b/flytepropeller/Makefile index 351ff9ed03..eb8d727cdf 100644 --- a/flytepropeller/Makefile +++ b/flytepropeller/Makefile @@ -37,7 +37,7 @@ cross_compile: go build -o bin/cross/kubectl-flyte ./cmd/kubectl-flyte/main.go op_code_generate: - @RESOURCE_NAME=flyteworkflow OPERATOR_PKG=github.com/flyteorg/flytepropeller ./hack/update-codegen.sh + @RESOURCE_NAME=flyteworkflow OPERATOR_PKG=github.com/flyteorg/flyte/flytepropeller ./hack/update-codegen.sh benchmark: mkdir -p ./bin/benchmark diff --git a/flytepropeller/hack/update-codegen.sh b/flytepropeller/hack/update-codegen.sh index 82a1ef21fd..6c177456c9 100755 --- a/flytepropeller/hack/update-codegen.sh +++ b/flytepropeller/hack/update-codegen.sh @@ -36,7 +36,7 @@ bash ${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \ ${OPERATOR_PKG}/pkg/client \ ${OPERATOR_PKG}/pkg/apis \ ${RESOURCE_NAME}:v1alpha1 \ - --output-base "$(dirname ${BASH_SOURCE})/../../../.." \ + --output-base "$(dirname ${BASH_SOURCE})/../../../../.." \ --go-header-file ${SCRIPT_ROOT}/hack/boilerplate.go.txt # To use your own boilerplate text use: diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch.go index 37a54dfffe..8d542e2d0f 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/golang/protobuf/jsonpb" + "google.golang.org/protobuf/proto" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" ) @@ -31,9 +32,7 @@ func (in *BooleanExpression) UnmarshalJSON(b []byte) error { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BooleanExpression) DeepCopyInto(out *BooleanExpression) { - *out = *in - // We do not manipulate the object, so its ok - // Once we figure out the autogenerate story we can replace this + out.BooleanExpression = proto.Clone(in.BooleanExpression).(*core.BooleanExpression) } type IfBlock struct { @@ -49,11 +48,35 @@ func (in *IfBlock) GetThenNode() *NodeID { return in.ThenNode } +type Error struct { + *core.Error +} + +func (in *Error) UnmarshalJSON(b []byte) error { + in.Error = &core.Error{} + return jsonpb.Unmarshal(bytes.NewReader(b), in.Error) +} + +func (in Error) MarshalJSON() ([]byte, error) { + if in.Error == nil { + return nilJSON, nil + } + var buf bytes.Buffer + if err := marshaler.Marshal(&buf, in.Error); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (in *Error) DeepCopyInto(out *Error) { + out.Error = proto.Clone(in.Error).(*core.Error) +} + type BranchNodeSpec struct { - If IfBlock `json:"if"` - ElseIf []*IfBlock `json:"elseIf,omitempty"` - Else *NodeID `json:"else,omitempty"` - ElseFail *core.Error `json:"elseFail,omitempty"` + If IfBlock `json:"if"` + ElseIf []*IfBlock `json:"elseIf,omitempty"` + Else *NodeID `json:"else,omitempty"` + ElseFail *Error `json:"elseFail,omitempty"` } func (in *BranchNodeSpec) GetIf() ExecutableIfBlock { @@ -73,5 +96,8 @@ func (in *BranchNodeSpec) GetElseIf() []ExecutableIfBlock { } func (in *BranchNodeSpec) GetElseFail() *core.Error { - return in.ElseFail + if in.ElseFail == nil { + return nil + } + return in.ElseFail.Error } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch_test.go index 5fd2a14218..7e030b07bc 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" ) @@ -30,8 +31,10 @@ func TestBranchNodeSpecMethods(t *testing.T) { boolExpr := &core.BooleanExpression{} // Creating an Error instance for testing - errorMessage := &core.Error{ - Message: "Test error", + errorMessage := &Error{ + Error: &core.Error{ + Message: "Test error", + }, } ifNode := NodeID("ifNode") @@ -71,8 +74,73 @@ func TestBranchNodeSpecMethods(t *testing.T) { assert.Equal(t, boolExpr, elifs[0].GetCondition()) assert.Equal(t, &elifNode, elifs[0].GetThenNode()) - assert.Equal(t, errorMessage, branchNodeSpec.GetElseFail()) + assert.True(t, proto.Equal(errorMessage.Error, branchNodeSpec.GetElseFail())) branchNodeSpec.ElseFail = nil assert.Nil(t, branchNodeSpec.GetElseFail()) } + +func TestWrappedBooleanExpressionDeepCopy(t *testing.T) { + // 1. Set up proto + protoBoolExpr := &core.BooleanExpression{ + Expr: &core.BooleanExpression_Comparison{ + Comparison: &core.ComparisonExpression{ + Operator: core.ComparisonExpression_GT, + LeftValue: &core.Operand{ + Val: &core.Operand_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 10, + }, + }, + }, + }, + RightValue: &core.Operand{ + Val: &core.Operand_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 11, + }, + }, + }, + }, + }, + }, + } + + // 2. Define the wrapper object + boolExpr := BooleanExpression{ + BooleanExpression: protoBoolExpr, + } + + // 3. Deep copy the wrapper object + copyBoolExpr := boolExpr.DeepCopy() + + // 4. Compare the pointers and the actual values + // Assert that the pointers are different + assert.True(t, boolExpr.BooleanExpression != copyBoolExpr.BooleanExpression) + + // Assert that the values stored in the proto messages are equal + assert.True(t, proto.Equal(boolExpr.BooleanExpression, copyBoolExpr.BooleanExpression)) +} + +func TestWrappedErrorDeepCopy(t *testing.T) { + // 1. Set up proto + protoError := &core.Error{ + Message: "an error", + } + + // 2. Define the wrapper object + error := Error{ + Error: protoError, + } + + // 3. Deep copy the wrapper object + errorCopy := error.DeepCopy() + + // 4. Assert that the pointers are different + assert.True(t, error.Error != errorCopy.Error) + + // 5. Assert that the values stored in the proto messages are equal + assert.True(t, proto.Equal(error.Error, errorCopy.Error)) +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error.go index 39ec19c165..0aee9510cd 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/golang/protobuf/jsonpb" + "google.golang.org/protobuf/proto" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" ) @@ -30,5 +31,5 @@ func (in *ExecutionError) MarshalJSON() ([]byte, error) { } func (in *ExecutionError) DeepCopyInto(out *ExecutionError) { - *out = *in + out.ExecutionError = proto.Clone(in.ExecutionError).(*core.ExecutionError) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error_test.go index 4e0968205d..fc0d35f9cf 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "testing" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -28,3 +29,20 @@ func TestExecutionErrorJSONMarshalling(t *testing.T) { assert.Equal(t, execError.Message, newExecErr.ExecutionError.Message) assert.Equal(t, execError.ErrorUri, newExecErr.ExecutionError.ErrorUri) } + +func TestExecutionErrorDeepCopy(t *testing.T) { + execError := &core.ExecutionError{ + Code: "TestCode", + Message: "Test error message", + ErrorUri: "Test error uri", + } + + execErr := &ExecutionError{ExecutionError: execError} + newExecErr := execErr.DeepCopy() + + // 4. Compare the pointers and the actual values + // Assert that the pointers are different + assert.True(t, execErr.ExecutionError != newExecErr.ExecutionError) + // Assert that the values stored in the proto messages are equal + assert.True(t, proto.Equal(execErr.ExecutionError, newExecErr.ExecutionError)) +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go index 30cd9fa0de..e8023d68d8 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go @@ -1,6 +1,7 @@ package v1alpha1 import ( + "google.golang.org/protobuf/proto" "k8s.io/apimachinery/pkg/api/resource" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" @@ -15,7 +16,7 @@ type RawOutputDataConfig struct { } func (in *RawOutputDataConfig) DeepCopyInto(out *RawOutputDataConfig) { - *out = *in + out.RawOutputDataConfig = proto.Clone(in.RawOutputDataConfig).(*admin.RawOutputDataConfig) } // This contains workflow-execution specifications and overrides. diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config_test.go index b3751b4268..e69b354bf0 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config_test.go @@ -4,8 +4,11 @@ import ( "testing" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" + "k8s.io/apimachinery/pkg/api/resource" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" ) func TestRawOutputConfig(t *testing.T) { @@ -14,3 +17,55 @@ func TestRawOutputConfig(t *testing.T) { }} assert.Equal(t, "s3://bucket", r.OutputLocationPrefix) } + +func TestWrappedRawOutputConfigDeepCopy(t *testing.T) { + rawOutputDataConfig := RawOutputDataConfig{&admin.RawOutputDataConfig{ + OutputLocationPrefix: "s3://bucket", + }} + rawOutputDataConfigCopy := rawOutputDataConfig.DeepCopy() + + assert.True(t, rawOutputDataConfig.RawOutputDataConfig != rawOutputDataConfigCopy.RawOutputDataConfig) + assert.True(t, proto.Equal(rawOutputDataConfig.RawOutputDataConfig, rawOutputDataConfigCopy.RawOutputDataConfig)) +} + +func TestExecutionConfigDeepCopy(t *testing.T) { + // 1. Create an ExecutionConfig object (including the wrapper object - WorkflowExecutionIdentifier) + interruptible := true + executionConfig := &ExecutionConfig{ + TaskPluginImpls: map[string]TaskPluginOverride{}, + MaxParallelism: 32, + RecoveryExecution: WorkflowExecutionIdentifier{ + WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Org: "organization", + Name: "name", + }, + }, + TaskResources: TaskResources{ + Requests: TaskResourceSpec{ + CPU: resource.Quantity{ + Format: "1", + }, + Memory: resource.Quantity{ + Format: "1Gi", + }, + }, + }, + Interruptible: &interruptible, + OverwriteCache: false, + EnvironmentVariables: map[string]string{ + "key": "value", + }, + } + + // 2. Deep copy the wrapper object + executionConfigCopy := executionConfig.DeepCopy() + + // 3. Assert that the pointers are different + assert.True(t, executionConfig.RecoveryExecution.WorkflowExecutionIdentifier != executionConfigCopy.RecoveryExecution.WorkflowExecutionIdentifier) + + // 4. Assert that the values are the same + assert.True(t, proto.Equal(executionConfig.RecoveryExecution.WorkflowExecutionIdentifier, executionConfigCopy.RecoveryExecution.WorkflowExecutionIdentifier)) + assert.Equal(t, executionConfig, executionConfigCopy) +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/gate.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/gate.go index a7ffa799fa..4615580d81 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/gate.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/gate.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/golang/protobuf/jsonpb" + "google.golang.org/protobuf/proto" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" ) @@ -36,6 +37,10 @@ func (in ApproveCondition) MarshalJSON() ([]byte, error) { return buf.Bytes(), nil } +func (in *ApproveCondition) DeepCopyInto(out *ApproveCondition) { + out.ApproveCondition = proto.Clone(in.ApproveCondition).(*core.ApproveCondition) +} + func (in *ApproveCondition) UnmarshalJSON(b []byte) error { in.ApproveCondition = &core.ApproveCondition{} return jsonpb.Unmarshal(bytes.NewReader(b), in.ApproveCondition) @@ -62,6 +67,10 @@ func (in *SignalCondition) UnmarshalJSON(b []byte) error { return jsonpb.Unmarshal(bytes.NewReader(b), in.SignalCondition) } +func (in *SignalCondition) DeepCopyInto(out *SignalCondition) { + out.SignalCondition = proto.Clone(in.SignalCondition).(*core.SignalCondition) +} + type SleepCondition struct { *core.SleepCondition } @@ -83,6 +92,10 @@ func (in *SleepCondition) UnmarshalJSON(b []byte) error { return jsonpb.Unmarshal(bytes.NewReader(b), in.SleepCondition) } +func (in *SleepCondition) DeepCopyInto(out *SleepCondition) { + out.SleepCondition = proto.Clone(in.SleepCondition).(*core.SleepCondition) +} + type GateNodeSpec struct { Kind ConditionKind `json:"kind"` Approve *ApproveCondition `json:"approve,omitempty"` diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/identifier.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/identifier.go index 7d6a3622c8..9bd11e6599 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/identifier.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/identifier.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/golang/protobuf/jsonpb" + "google.golang.org/protobuf/proto" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" ) @@ -26,7 +27,7 @@ func (in *Identifier) MarshalJSON() ([]byte, error) { } func (in *Identifier) DeepCopyInto(out *Identifier) { - *out = *in + out.Identifier = proto.Clone(in.Identifier).(*core.Identifier) } type WorkflowExecutionIdentifier struct { @@ -34,7 +35,7 @@ type WorkflowExecutionIdentifier struct { } func (in *WorkflowExecutionIdentifier) DeepCopyInto(out *WorkflowExecutionIdentifier) { - *out = *in + out.WorkflowExecutionIdentifier = proto.Clone(in.WorkflowExecutionIdentifier).(*core.WorkflowExecutionIdentifier) } type TaskExecutionIdentifier struct { @@ -42,5 +43,5 @@ type TaskExecutionIdentifier struct { } func (in *TaskExecutionIdentifier) DeepCopyInto(out *TaskExecutionIdentifier) { - *out = *in + out.TaskExecutionIdentifier = proto.Clone(in.TaskExecutionIdentifier).(*core.TaskExecutionIdentifier) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index aab034224d..c005d3ea5a 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -111,10 +111,10 @@ func (in *DynamicNodeStatus) GetDynamicNodeReason() string { } func (in *DynamicNodeStatus) GetExecutionError() *core.ExecutionError { - if in.Error == nil { - return nil + if in.Error != nil { + return in.Error.ExecutionError } - return in.Error.ExecutionError + return nil } func (in *DynamicNodeStatus) GetIsFailurePermanent() bool { @@ -137,9 +137,7 @@ func (in *DynamicNodeStatus) SetDynamicNodePhase(phase DynamicNodePhase) { func (in *DynamicNodeStatus) SetExecutionError(err *core.ExecutionError) { if err != nil { - in.Error = &ExecutionError{ExecutionError: err} - } else { - in.Error = nil + in.Error = &ExecutionError{err} } } @@ -170,19 +168,22 @@ const ( type WorkflowNodeStatus struct { MutableStruct - Phase WorkflowNodePhase `json:"phase,omitempty"` - ExecutionError *core.ExecutionError `json:"executionError,omitempty"` + Phase WorkflowNodePhase `json:"phase,omitempty"` + ExecutionError *ExecutionError `json:"executionError,omitempty"` } func (in *WorkflowNodeStatus) SetExecutionError(executionError *core.ExecutionError) { - if in.ExecutionError != executionError { + if in.ExecutionError != nil && in.ExecutionError.ExecutionError != executionError { in.SetDirty() - in.ExecutionError = executionError + in.ExecutionError.ExecutionError = executionError } } func (in *WorkflowNodeStatus) GetExecutionError() *core.ExecutionError { - return in.ExecutionError + if in.ExecutionError != nil { + return in.ExecutionError.ExecutionError + } + return nil } func (in *WorkflowNodeStatus) GetWorkflowNodePhase() WorkflowNodePhase { @@ -231,7 +232,7 @@ const ( type ArrayNodeStatus struct { MutableStruct Phase ArrayNodePhase `json:"phase,omitempty"` - ExecutionError *core.ExecutionError `json:"executionError,omitempty"` + ExecutionError *ExecutionError `json:"executionError,omitempty"` SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"` SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"` SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"` @@ -251,13 +252,16 @@ func (in *ArrayNodeStatus) SetArrayNodePhase(phase ArrayNodePhase) { } func (in *ArrayNodeStatus) GetExecutionError() *core.ExecutionError { - return in.ExecutionError + if in.ExecutionError != nil { + return in.ExecutionError.ExecutionError + } + return nil } func (in *ArrayNodeStatus) SetExecutionError(executionError *core.ExecutionError) { - if in.ExecutionError != executionError { + if in.ExecutionError != nil && in.ExecutionError.ExecutionError != executionError { in.SetDirty() - in.ExecutionError = executionError + in.ExecutionError.ExecutionError = executionError } } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go index 0278d62f55..e05c1592f4 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go @@ -187,7 +187,7 @@ func TestDynamicNodeStatus_SetExecutionError(t *testing.T) { } in.SetExecutionError(tt.NewError) if tt.NewError == nil { - assert.Nil(t, in.Error) + assert.Nil(t, in.Error.ExecutionError) } else { assert.NotNil(t, in.Error) assert.Equal(t, tt.NewError, in.Error.ExecutionError) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/testdata/workflowspec.yaml b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/testdata/workflowspec.yaml index c83f8182b9..21fc3ad694 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/testdata/workflowspec.yaml +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/testdata/workflowspec.yaml @@ -1,207 +1,88 @@ -workflow.apiVersion: flyte.lyft.com/v1alpha1 kind: flyteworkflow metadata: - creationTimestamp: null generateName: dummy-workflow-1-0- + creationTimestamp: null labels: execution-id: "" workflow-id: dummy-workflow-1-0 -inputs: - literals: - triggered_date: - scalar: - primitive: - datetime: 2018-08-01T18:09:18Z spec: - connections: - add-one-and-print-0: - - sum-non-none-0 - add-one-and-print-1: - - add-one-and-print-2 - - sum-and-print-0 - add-one-and-print-2: - - sum-and-print-0 - add-one-and-print-3: - - sum-non-none-0 - start-node: - - add-one-and-print-0 - - add-one-and-print-3 - - print-every-time-0 - sum-and-print-0: - - end-node - - print-every-time-0 - sum-non-none-0: - - add-one-and-print-1 - - sum-and-print-0 id: dummy-workflow-1-0 nodes: add-one-and-print-0: - activeDeadlineSeconds: 0 id: add-one-and-print-0 - input_bindings: - - binding: - scalar: - primitive: - integer: "3" - var: value_to_print - kind: task resources: {} - status: - phase: 0 - task_ref: add-one-and-print + kind: task add-one-and-print-1: - activeDeadlineSeconds: 0 id: add-one-and-print-1 - input_bindings: - - binding: - promise: - nodeId: sum-non-none-0 - var: out - var: value_to_print - kind: task resources: {} - status: - phase: 0 - task_ref: add-one-and-print + kind: task add-one-and-print-2: - activeDeadlineSeconds: 0 id: add-one-and-print-2 - input_bindings: - - binding: - promise: - nodeId: add-one-and-print-1 - var: out - var: value_to_print - kind: task resources: {} - status: - phase: 0 - task_ref: add-one-and-print + kind: task add-one-and-print-3: - activeDeadlineSeconds: 0 id: add-one-and-print-3 - input_bindings: - - binding: - scalar: - primitive: - integer: "101" - var: value_to_print - kind: task resources: {} - status: - phase: 0 - task_ref: add-one-and-print + kind: task end-node: id: end-node - input_bindings: - - binding: - promise: - nodeId: sum-and-print-0 - var: out - var: output - kind: end resources: {} - status: - phase: 0 + kind: end print-every-time-0: - activeDeadlineSeconds: 0 id: print-every-time-0 - input_bindings: - - binding: - promise: - nodeId: start-node - var: triggered_date - var: date_triggered - - binding: - promise: - nodeId: sum-and-print-0 - var: out_blob - var: in_blob - - binding: - promise: - nodeId: sum-and-print-0 - var: multi_blob - var: multi_blob - - binding: - promise: - nodeId: sum-and-print-0 - var: out - var: value_to_print - kind: task resources: {} - status: - phase: 0 - task_ref: print-every-time + kind: task start-node: id: start-node - kind: start resources: {} - status: - phase: 0 + kind: start sum-and-print-0: - activeDeadlineSeconds: 0 id: sum-and-print-0 - input_bindings: - - binding: - collection: - bindings: - - promise: - nodeId: sum-non-none-0 - var: out - - promise: - nodeId: add-one-and-print-1 - var: out - - promise: - nodeId: add-one-and-print-2 - var: out - - scalar: - primitive: - integer: "100" - var: values_to_add - kind: task resources: {} - status: - phase: 0 - task_ref: sum-and-print + kind: task sum-non-none-0: - activeDeadlineSeconds: 0 id: sum-non-none-0 - input_bindings: - - binding: - collection: - bindings: - - promise: - nodeId: add-one-and-print-0 - var: out - - promise: - nodeId: add-one-and-print-3 - var: out - var: values_to_print - kind: task resources: {} - status: - phase: 0 - task_ref: sum-non-none -status: - phase: 0 + kind: task + connections: + add-one-and-print-0: + - sum-non-none-0 + add-one-and-print-1: + - add-one-and-print-2 + - sum-and-print-0 + add-one-and-print-2: + - sum-and-print-0 + add-one-and-print-3: + - sum-non-none-0 + start-node: + - add-one-and-print-0 + - add-one-and-print-3 + - print-every-time-0 + sum-and-print-0: + - end-node + - print-every-time-0 + sum-non-none-0: + - add-one-and-print-1 + - sum-and-print-0 + edges: + downstream: null + upstream: null +inputs: + literals: + triggered_date: + scalar: + primitive: + datetime: "2018-08-01T18:09:18Z" +executionId: {} tasks: add-one-and-print: - container: - args: - - --task-module=flytekit.examples.tasks - - --task-name=add_one_and_print - - --inputs={{$input}} - - --output-prefix={{$output}} - command: - - flyte-python-entrypoint - image: myflyteimage:abc123 - resources: - requests: - - value: "0.000" - - value: "2.000" - - value: 2048Mi id: name: add-one-and-print + type: "7" + metadata: + runtime: + version: 1.19.0b7 + timeout: 0s interface: inputs: variables: @@ -213,8 +94,52 @@ tasks: out: type: simple: INTEGER - metadata: - runtime: - version: 1.19.0b7 - timeout: 0s - type: "7" + container: + image: myflyteimage:abc123 + command: + - flyte-python-entrypoint + args: + - --task-module=flytekit.examples.tasks + - --task-name=add_one_and_print + - --inputs={{$input}} + - --output-prefix={{$output}} + resources: + requests: + - value: "0.000" + - value: "2.000" + - value: 2048Mi +node-defaults: {} +securityContext: + run_as: + iam_role: abc-def + k8s_service_account: service-account + oauth2_client: + client_id: client-id + client_secret: + group: group + group_version: group-version + key: key + execution_identity: execution-identity +status: + phase: 0 +rawOutputDataConfig: {} +executionConfig: + TaskPluginImpls: null + MaxParallelism: 0 + RecoveryExecution: {} + TaskResources: + Requests: + CPU: "0" + Memory: "0" + EphemeralStorage: "0" + Storage: "0" + GPU: "0" + Limits: + CPU: "0" + Memory: "0" + EphemeralStorage: "0" + Storage: "0" + GPU: "0" + Interruptible: null + OverwriteCache: false + EnvironmentVariables: null diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 225a49ac3f..cab4eeae37 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -7,6 +7,7 @@ import ( "github.com/golang/protobuf/jsonpb" "github.com/pkg/errors" + "google.golang.org/protobuf/proto" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -58,7 +59,7 @@ type FlyteWorkflow struct { ServiceAccountName string `json:"serviceAccountName,omitempty" protobuf:"bytes,8,opt,name=serviceAccountName"` // Security context fields to define privilege and access control settings // +optional - SecurityContext core.SecurityContext `json:"securityContext,omitempty" protobuf:"bytes,12,rep,name=securityContext"` + SecurityContext SecurityContext `json:"securityContext,omitempty" protobuf:"bytes,12,rep,name=securityContext"` // Status is the only mutable section in the workflow. It holds all the execution information Status WorkflowStatus `json:"status,omitempty"` // RawOutputDataConfig defines the configurations to use for generating raw outputs (e.g. blobs, schemas). @@ -79,8 +80,16 @@ type FlyteWorkflow struct { WorkflowClosureReference DataReference `json:"workflowClosureReference,omitempty"` } +type SecurityContext struct { + core.SecurityContext +} + +func (in *SecurityContext) DeepCopyInto(out *SecurityContext) { + out.SecurityContext = *proto.Clone(&in.SecurityContext).(*core.SecurityContext) +} + func (in *FlyteWorkflow) GetSecurityContext() core.SecurityContext { - return in.SecurityContext + return in.SecurityContext.SecurityContext } func (in *FlyteWorkflow) GetEventVersion() EventVersion { @@ -206,11 +215,8 @@ func (in *Inputs) MarshalJSON() ([]byte, error) { return buf.Bytes(), nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Inputs) DeepCopyInto(out *Inputs) { - *out = *in - // We do not manipulate the object, so its ok - // Once we figure out the autogenerate story we can replace this + out.LiteralMap = proto.Clone(in.LiteralMap).(*core.LiteralMap) } // Deprecated: Please use Connections instead diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow_test.go index 6e72b95ec7..a18b4ef020 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow_test.go @@ -4,12 +4,18 @@ import ( "encoding/json" "io/ioutil" "testing" + "time" "github.com/ghodss/yaml" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytestdlib/storage" ) func TestMarshalUnmarshal_Connections(t *testing.T) { @@ -51,6 +57,195 @@ func TestWorkflowSpec(t *testing.T) { assert.Nil(t, w.GetOnFailureNode()) assert.Equal(t, 7, len(w.GetConnections().Downstream)) assert.Equal(t, 8, len(w.GetConnections().Upstream)) + expectedSecurityContext := v1alpha1.SecurityContext{ + core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: "abc-def", + K8SServiceAccount: "service-account", + Oauth2Client: &core.OAuth2Client{ + ClientId: "client-id", + ClientSecret: &core.Secret{ + Group: "group", + GroupVersion: "group-version", + Key: "key", + }, + }, + ExecutionIdentity: "execution-identity", + }, + }, + } + securityContext := w.GetSecurityContext() + assert.True(t, proto.Equal(&securityContext, &expectedSecurityContext)) +} + +func TestWrappedValuesInFlyteWorkflow(t *testing.T) { + j, err := ReadYamlFileAsJSON("testdata/workflowspec.yaml") + assert.NoError(t, err) + w := &v1alpha1.FlyteWorkflow{} + err = json.Unmarshal(j, w) + if !assert.NoError(t, err) { + t.FailNow() + } + + wCopy := w.DeepCopy() + + wSecurityContext := w.GetSecurityContext() + wCopySecurityContext := wCopy.GetSecurityContext() + assert.True(t, proto.Equal(&wSecurityContext, &wCopySecurityContext)) + +} + +func TestWorkflow(t *testing.T) { + // Instantiate a new workflow, but be as comprehensive as possible. + // This is a bit of a pain, but it's the only way to ensure that the defaults are set correctly. + w := &v1alpha1.FlyteWorkflow{ + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: v1alpha1.WorkflowID("some-id"), + Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{}, + Connections: v1alpha1.Connections{ + Downstream: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, + Upstream: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, + }, + }, + WorkflowMeta: &v1alpha1.WorkflowMeta{ + EventVersion: 42, + }, + Inputs: &v1alpha1.Inputs{ + &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "input1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 1, + }, + }, + }, + }, + }, + }, + }, + }, + }, + ExecutionID: v1alpha1.WorkflowExecutionIdentifier{ + &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + Org: "org", + }, + }, + Tasks: map[v1alpha1.TaskID]*v1alpha1.TaskSpec{ + "task1": &v1alpha1.TaskSpec{ + &core.TaskTemplate{ + Id: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "project", + Domain: "domain", + Name: "name", + Version: "version", + }, + Interface: &core.TypedInterface{ + Inputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "input1": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + }, + }, + }, + }, + }, + ActiveDeadlineSeconds: proto.Int64(42), + NodeDefaults: v1alpha1.NodeDefaults{ + Interruptible: true, + }, + AcceptedAt: &metav1.Time{ + Time: time.Now(), + }, + SecurityContext: v1alpha1.SecurityContext{ + core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: "abc-def", + K8SServiceAccount: "service-account", + Oauth2Client: &core.OAuth2Client{ + ClientId: "client-id", + ClientSecret: &core.Secret{ + Group: "group", + GroupVersion: "group-version", + Key: "key", + }, + }, + ExecutionIdentity: "execution-identity", + }, + }, + }, + Status: v1alpha1.WorkflowStatus{ + Phase: v1alpha1.WorkflowPhase(41), + StartedAt: &metav1.Time{ + Time: time.Now(), + }, + StoppedAt: &metav1.Time{ + Time: time.Now(), + }, + LastUpdatedAt: &metav1.Time{ + Time: time.Now(), + }, + Message: "message", + DataDir: storage.DataReference("data-dir"), + OutputReference: storage.DataReference("output-reference"), + NodeStatus: map[string]*v1alpha1.NodeStatus{}, + FailedAttempts: 32, + Error: &v1alpha1.ExecutionError{ + &core.ExecutionError{ + Code: "code", + Message: "message", + }, + }, + }, + RawOutputDataConfig: v1alpha1.RawOutputDataConfig{ + &admin.RawOutputDataConfig{ + OutputLocationPrefix: "output-location-prefix", + }, + }, + ExecutionConfig: v1alpha1.ExecutionConfig{ + TaskPluginImpls: map[string]v1alpha1.TaskPluginOverride{}, + MaxParallelism: 42, + RecoveryExecution: v1alpha1.WorkflowExecutionIdentifier{ + &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + Org: "org", + }, + }, + }, + DataReferenceConstructor: storage.URLPathConstructor{}, + WorkflowClosureReference: storage.DataReference("data-reference"), + } + + wCopy := w.DeepCopy() + + // Test that the pointers are different. + assert.NotSame(t, w, wCopy) + + assert.True(t, w.WorkflowSpec != wCopy.WorkflowSpec) + assert.Equal(t, w.WorkflowSpec, wCopy.WorkflowSpec) + assert.True(t, w.WorkflowMeta != wCopy.WorkflowMeta) + assert.Equal(t, w.WorkflowMeta, wCopy.WorkflowMeta) + assert.True(t, w.Inputs != wCopy.Inputs) + assert.Equal(t, w.Inputs, wCopy.Inputs) + + wSecurityContext := w.GetSecurityContext() + wCopySecurityContext := wCopy.GetSecurityContext() + assert.True(t, proto.Equal(&wSecurityContext, &wCopySecurityContext)) } func TestWorkflowIsInterruptible(t *testing.T) { @@ -79,3 +274,38 @@ func TestWorkflowIsInterruptible(t *testing.T) { w.NodeDefaults.Interruptible = true assert.True(t, w.IsInterruptible()) } + +func TestWrappedInputsDeepCopy(t *testing.T) { + // 1. Setup proto + litMap := core.LiteralMap{ + Literals: map[string]*core.Literal{ + "p1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 1, + }, + }, + }, + }, + }, + }, + }, + } + + // 2. Define wrapper + inputs := v1alpha1.Inputs{ + &litMap, + } + + // 3. Deep copy + inputsCopy := inputs.DeepCopy() + + // 4. Assert that pointers are different + assert.True(t, inputs.LiteralMap != inputsCopy.LiteralMap) + + // 5. Assert that the content is the same + assert.True(t, proto.Equal(inputs.LiteralMap, inputsCopy.LiteralMap)) +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go index febbca733c..a754ede868 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go @@ -6,7 +6,6 @@ package v1alpha1 import ( - core "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" @@ -22,6 +21,77 @@ func (in *Alias) DeepCopy() *Alias { return out } +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApproveCondition. +func (in *ApproveCondition) DeepCopy() *ApproveCondition { + if in == nil { + return nil + } + out := new(ApproveCondition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ArrayNodeSpec) DeepCopyInto(out *ArrayNodeSpec) { + *out = *in + if in.SubNodeSpec != nil { + in, out := &in.SubNodeSpec, &out.SubNodeSpec + *out = new(NodeSpec) + (*in).DeepCopyInto(*out) + } + if in.Parallelism != nil { + in, out := &in.Parallelism, &out.Parallelism + *out = new(uint32) + **out = **in + } + if in.MinSuccesses != nil { + in, out := &in.MinSuccesses, &out.MinSuccesses + *out = new(uint32) + **out = **in + } + if in.MinSuccessRatio != nil { + in, out := &in.MinSuccessRatio, &out.MinSuccessRatio + *out = new(float32) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArrayNodeSpec. +func (in *ArrayNodeSpec) DeepCopy() *ArrayNodeSpec { + if in == nil { + return nil + } + out := new(ArrayNodeSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ArrayNodeStatus) DeepCopyInto(out *ArrayNodeStatus) { + *out = *in + out.MutableStruct = in.MutableStruct + if in.ExecutionError != nil { + in, out := &in.ExecutionError, &out.ExecutionError + *out = (*in).DeepCopy() + } + in.SubNodePhases.DeepCopyInto(&out.SubNodePhases) + in.SubNodeTaskPhases.DeepCopyInto(&out.SubNodeTaskPhases) + in.SubNodeRetryAttempts.DeepCopyInto(&out.SubNodeRetryAttempts) + in.SubNodeSystemFailures.DeepCopyInto(&out.SubNodeSystemFailures) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArrayNodeStatus. +func (in *ArrayNodeStatus) DeepCopy() *ArrayNodeStatus { + if in == nil { + return nil + } + out := new(ArrayNodeStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Binding. func (in *Binding) DeepCopy() *Binding { if in == nil { @@ -64,7 +134,7 @@ func (in *BranchNodeSpec) DeepCopyInto(out *BranchNodeSpec) { } if in.ElseFail != nil { in, out := &in.ElseFail, &out.ElseFail - *out = *in + *out = (*in).DeepCopy() } return } @@ -142,6 +212,16 @@ func (in *DynamicNodeStatus) DeepCopy() *DynamicNodeStatus { return out } +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Error. +func (in *Error) DeepCopy() *Error { + if in == nil { + return nil + } + out := new(Error) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExecutionConfig) DeepCopyInto(out *ExecutionConfig) { *out = *in @@ -152,7 +232,20 @@ func (in *ExecutionConfig) DeepCopyInto(out *ExecutionConfig) { (*out)[key] = *val.DeepCopy() } } - out.MaxParallelism = in.MaxParallelism + in.RecoveryExecution.DeepCopyInto(&out.RecoveryExecution) + in.TaskResources.DeepCopyInto(&out.TaskResources) + if in.Interruptible != nil { + in, out := &in.Interruptible, &out.Interruptible + *out = new(bool) + **out = **in + } + if in.EnvironmentVariables != nil { + in, out := &in.EnvironmentVariables, &out.EnvironmentVariables + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } @@ -176,6 +269,16 @@ func (in *ExecutionError) DeepCopy() *ExecutionError { return out } +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtendedResources. +func (in *ExtendedResources) DeepCopy() *ExtendedResources { + if in == nil { + return nil + } + out := new(ExtendedResources) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FlyteWorkflow) DeepCopyInto(out *FlyteWorkflow) { *out = *in @@ -235,13 +338,12 @@ func (in *FlyteWorkflow) DeepCopyInto(out *FlyteWorkflow) { in, out := &in.AcceptedAt, &out.AcceptedAt *out = (*in).DeepCopy() } - - out.SecurityContext = in.SecurityContext + in.SecurityContext.DeepCopyInto(&out.SecurityContext) in.Status.DeepCopyInto(&out.Status) in.RawOutputDataConfig.DeepCopyInto(&out.RawOutputDataConfig) in.ExecutionConfig.DeepCopyInto(&out.ExecutionConfig) if in.DataReferenceConstructor != nil { - out.DataReferenceConstructor = in.DataReferenceConstructor + out.DataReferenceConstructor = in.DataReferenceConstructor.DeepCopyReferenceConstructor() } return } @@ -297,6 +399,51 @@ func (in *FlyteWorkflowList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GateNodeSpec) DeepCopyInto(out *GateNodeSpec) { + *out = *in + if in.Approve != nil { + in, out := &in.Approve, &out.Approve + *out = (*in).DeepCopy() + } + if in.Signal != nil { + in, out := &in.Signal, &out.Signal + *out = (*in).DeepCopy() + } + if in.Sleep != nil { + in, out := &in.Sleep, &out.Sleep + *out = (*in).DeepCopy() + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GateNodeSpec. +func (in *GateNodeSpec) DeepCopy() *GateNodeSpec { + if in == nil { + return nil + } + out := new(GateNodeSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GateNodeStatus) DeepCopyInto(out *GateNodeStatus) { + *out = *in + out.MutableStruct = in.MutableStruct + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GateNodeStatus. +func (in *GateNodeStatus) DeepCopy() *GateNodeStatus { + if in == nil { + return nil + } + out := new(GateNodeStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Identifier. func (in *Identifier) DeepCopy() *Identifier { if in == nil { @@ -404,6 +551,16 @@ func (in *NodeSpec) DeepCopyInto(out *NodeSpec) { *out = new(WorkflowNodeSpec) (*in).DeepCopyInto(*out) } + if in.GateNode != nil { + in, out := &in.GateNode, &out.GateNode + *out = new(GateNodeSpec) + (*in).DeepCopyInto(*out) + } + if in.ArrayNode != nil { + in, out := &in.ArrayNode, &out.ArrayNode + *out = new(ArrayNodeSpec) + (*in).DeepCopyInto(*out) + } if in.InputBindings != nil { in, out := &in.InputBindings, &out.InputBindings *out = make([]*Binding, len(*in)) @@ -446,6 +603,10 @@ func (in *NodeSpec) DeepCopyInto(out *NodeSpec) { *out = new(v1.Affinity) (*in).DeepCopyInto(*out) } + if in.ExtendedResources != nil { + in, out := &in.ExtendedResources, &out.ExtendedResources + *out = (*in).DeepCopy() + } if in.Tolerations != nil { in, out := &in.Tolerations, &out.Tolerations *out = make([]v1.Toleration, len(*in)) @@ -548,12 +709,22 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { *out = new(DynamicNodeStatus) (*in).DeepCopyInto(*out) } + if in.GateNodeStatus != nil { + in, out := &in.GateNodeStatus, &out.GateNodeStatus + *out = new(GateNodeStatus) + **out = **in + } + if in.ArrayNodeStatus != nil { + in, out := &in.ArrayNodeStatus, &out.ArrayNodeStatus + *out = new(ArrayNodeStatus) + (*in).DeepCopyInto(*out) + } if in.Error != nil { in, out := &in.Error, &out.Error *out = (*in).DeepCopy() } if in.DataReferenceConstructor != nil { - out.DataReferenceConstructor = in.DataReferenceConstructor + out.DataReferenceConstructor = in.DataReferenceConstructor.DeepCopyReferenceConstructor() } return } @@ -609,6 +780,36 @@ func (in *RetryStrategy) DeepCopy() *RetryStrategy { return out } +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecurityContext. +func (in *SecurityContext) DeepCopy() *SecurityContext { + if in == nil { + return nil + } + out := new(SecurityContext) + in.DeepCopyInto(out) + return out +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SignalCondition. +func (in *SignalCondition) DeepCopy() *SignalCondition { + if in == nil { + return nil + } + out := new(SignalCondition) + in.DeepCopyInto(out) + return out +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SleepCondition. +func (in *SleepCondition) DeepCopy() *SleepCondition { + if in == nil { + return nil + } + out := new(SleepCondition) + in.DeepCopyInto(out) + return out +} + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskExecutionIdentifier. func (in *TaskExecutionIdentifier) DeepCopy() *TaskExecutionIdentifier { if in == nil { @@ -640,6 +841,45 @@ func (in *TaskPluginOverride) DeepCopy() *TaskPluginOverride { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskResourceSpec) DeepCopyInto(out *TaskResourceSpec) { + *out = *in + out.CPU = in.CPU.DeepCopy() + out.Memory = in.Memory.DeepCopy() + out.EphemeralStorage = in.EphemeralStorage.DeepCopy() + out.Storage = in.Storage.DeepCopy() + out.GPU = in.GPU.DeepCopy() + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskResourceSpec. +func (in *TaskResourceSpec) DeepCopy() *TaskResourceSpec { + if in == nil { + return nil + } + out := new(TaskResourceSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskResources) DeepCopyInto(out *TaskResources) { + *out = *in + in.Requests.DeepCopyInto(&out.Requests) + in.Limits.DeepCopyInto(&out.Limits) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskResources. +func (in *TaskResources) DeepCopy() *TaskResources { + if in == nil { + return nil + } + out := new(TaskResources) + in.DeepCopyInto(out) + return out +} + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskSpec. func (in *TaskSpec) DeepCopy() *TaskSpec { if in == nil { @@ -707,8 +947,7 @@ func (in *WorkflowNodeStatus) DeepCopyInto(out *WorkflowNodeStatus) { out.MutableStruct = in.MutableStruct if in.ExecutionError != nil { in, out := &in.ExecutionError, &out.ExecutionError - *out = new(core.ExecutionError) - *out = *in + *out = (*in).DeepCopy() } return } @@ -743,7 +982,6 @@ func (in *WorkflowSpec) DeepCopyInto(out *WorkflowSpec) { } in.DeprecatedConnections.DeepCopyInto(&out.DeprecatedConnections) in.Connections.DeepCopyInto(&out.Connections) - if in.OnFailure != nil { in, out := &in.OnFailure, &out.OnFailure *out = new(NodeSpec) @@ -810,8 +1048,13 @@ func (in *WorkflowStatus) DeepCopyInto(out *WorkflowStatus) { in, out := &in.Error, &out.Error *out = (*in).DeepCopy() } + if in.DefinitionVersion != nil { + in, out := &in.DefinitionVersion, &out.DefinitionVersion + *out = new(WorkflowDefinitionVersion) + **out = **in + } if in.DataReferenceConstructor != nil { - out.DataReferenceConstructor = in.DataReferenceConstructor + out.DataReferenceConstructor = in.DataReferenceConstructor.DeepCopyReferenceConstructor() } return } diff --git a/flytepropeller/pkg/client/clientset/versioned/clientset.go b/flytepropeller/pkg/client/clientset/versioned/clientset.go index 93d2cabdb4..b78503d1aa 100644 --- a/flytepropeller/pkg/client/clientset/versioned/clientset.go +++ b/flytepropeller/pkg/client/clientset/versioned/clientset.go @@ -4,6 +4,7 @@ package versioned import ( "fmt" + "net/http" flyteworkflowv1alpha1 "github.com/flyteorg/flyte/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1" discovery "k8s.io/client-go/discovery" @@ -39,22 +40,45 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface { // NewForConfig creates a new Clientset for the given config. // If config's RateLimiter is not set and QPS and Burst are acceptable, // NewForConfig will generate a rate-limiter in configShallowCopy. +// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient), +// where httpClient was generated with rest.HTTPClientFor(c). func NewForConfig(c *rest.Config) (*Clientset, error) { configShallowCopy := *c + + if configShallowCopy.UserAgent == "" { + configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent() + } + + // share the transport between all clients + httpClient, err := rest.HTTPClientFor(&configShallowCopy) + if err != nil { + return nil, err + } + + return NewForConfigAndClient(&configShallowCopy, httpClient) +} + +// NewForConfigAndClient creates a new Clientset for the given config and http client. +// Note the http client provided takes precedence over the configured transport values. +// If config's RateLimiter is not set and QPS and Burst are acceptable, +// NewForConfigAndClient will generate a rate-limiter in configShallowCopy. +func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) { + configShallowCopy := *c if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { if configShallowCopy.Burst <= 0 { return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0") } configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) } + var cs Clientset var err error - cs.flyteworkflowV1alpha1, err = flyteworkflowv1alpha1.NewForConfig(&configShallowCopy) + cs.flyteworkflowV1alpha1, err = flyteworkflowv1alpha1.NewForConfigAndClient(&configShallowCopy, httpClient) if err != nil { return nil, err } - cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) + cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfigAndClient(&configShallowCopy, httpClient) if err != nil { return nil, err } @@ -64,11 +88,11 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { // NewForConfigOrDie creates a new Clientset for the given config and // panics if there is an error in the config. func NewForConfigOrDie(c *rest.Config) *Clientset { - var cs Clientset - cs.flyteworkflowV1alpha1 = flyteworkflowv1alpha1.NewForConfigOrDie(c) - - cs.DiscoveryClient = discovery.NewDiscoveryClientForConfigOrDie(c) - return &cs + cs, err := NewForConfig(c) + if err != nil { + panic(err) + } + return cs } // New creates a new Clientset for the given RESTClient. diff --git a/flytepropeller/pkg/client/clientset/versioned/fake/clientset_generated.go b/flytepropeller/pkg/client/clientset/versioned/fake/clientset_generated.go index 7504b10cb2..4285d352ce 100644 --- a/flytepropeller/pkg/client/clientset/versioned/fake/clientset_generated.go +++ b/flytepropeller/pkg/client/clientset/versioned/fake/clientset_generated.go @@ -58,7 +58,10 @@ func (c *Clientset) Tracker() testing.ObjectTracker { return c.tracker } -var _ clientset.Interface = &Clientset{} +var ( + _ clientset.Interface = &Clientset{} + _ testing.FakeClient = &Clientset{} +) // FlyteworkflowV1alpha1 retrieves the FlyteworkflowV1alpha1Client func (c *Clientset) FlyteworkflowV1alpha1() flyteworkflowv1alpha1.FlyteworkflowV1alpha1Interface { diff --git a/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1/fake/fake_flyteworkflow.go b/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1/fake/fake_flyteworkflow.go index f7e8b0be5e..81653b47f0 100644 --- a/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1/fake/fake_flyteworkflow.go +++ b/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1/fake/fake_flyteworkflow.go @@ -101,7 +101,7 @@ func (c *FakeFlyteWorkflows) UpdateStatus(ctx context.Context, flyteWorkflow *v1 // Delete takes name of the flyteWorkflow and deletes it. Returns an error if one occurs. func (c *FakeFlyteWorkflows) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { _, err := c.Fake. - Invokes(testing.NewDeleteAction(flyteworkflowsResource, c.ns, name), &v1alpha1.FlyteWorkflow{}) + Invokes(testing.NewDeleteActionWithOptions(flyteworkflowsResource, c.ns, name, opts), &v1alpha1.FlyteWorkflow{}) return err } diff --git a/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1/flyteworkflow_client.go b/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1/flyteworkflow_client.go index 9f9029fcac..556789a1be 100644 --- a/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1/flyteworkflow_client.go +++ b/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1/flyteworkflow_client.go @@ -3,6 +3,8 @@ package v1alpha1 import ( + "net/http" + v1alpha1 "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/client/clientset/versioned/scheme" rest "k8s.io/client-go/rest" @@ -23,12 +25,28 @@ func (c *FlyteworkflowV1alpha1Client) FlyteWorkflows(namespace string) FlyteWork } // NewForConfig creates a new FlyteworkflowV1alpha1Client for the given config. +// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient), +// where httpClient was generated with rest.HTTPClientFor(c). func NewForConfig(c *rest.Config) (*FlyteworkflowV1alpha1Client, error) { config := *c if err := setConfigDefaults(&config); err != nil { return nil, err } - client, err := rest.RESTClientFor(&config) + httpClient, err := rest.HTTPClientFor(&config) + if err != nil { + return nil, err + } + return NewForConfigAndClient(&config, httpClient) +} + +// NewForConfigAndClient creates a new FlyteworkflowV1alpha1Client for the given config and http client. +// Note the http client provided takes precedence over the configured transport values. +func NewForConfigAndClient(c *rest.Config, h *http.Client) (*FlyteworkflowV1alpha1Client, error) { + config := *c + if err := setConfigDefaults(&config); err != nil { + return nil, err + } + client, err := rest.RESTClientForConfigAndClient(&config, h) if err != nil { return nil, err } diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf_crd.json b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf_crd.json index 5598c7a375..b92ad45ea0 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf_crd.json +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf_crd.json @@ -1 +1 @@ -{"kind":"flyteworkflow","apiVersion":"flyte.lyft.com/v1alpha1","metadata":{"name":"name","namespace":"namespace","creationTimestamp":null,"labels":{"domain":"domain","execution-id":"name","project":"hello","shard-key":"6","workflow-name":"core-control-flow-conditions-multiplier-2"}},"spec":{"id":"::core.control_flow.conditions.multiplier_2","nodes":{"end-node":{"id":"end-node","resources":{},"kind":"end","inputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"n0":{"id":"n0","name":"fractions","resources":{},"kind":"branch","branch":{"if":{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}}}},"then":"n0-n0"},"elseIf":[{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}},"rightExpression":{"comparison":{"operator":"LTE","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":10}}}}}},"then":"n0-n1"}],"elseFail":{"failed_node_id":"fractions","message":"The input must be between 0 and 10"}},"inputBindings":[{"var":".my_input","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0":{"id":"n0-n0","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n1":{"id":"n0-n1","name":"square","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.square\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"start-node":{"id":"start-node","resources":{},"kind":"start"}},"connections":{"n0":["end-node"],"start-node":["n0"]},"edges":{"downstream":{"n0":["end-node"],"start-node":["n0"]},"upstream":{"end-node":["n0"],"n0":["start-node"],"n0-n0":["start-node"],"n0-n1":["start-node"]}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}},"outputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"inputs":{"literals":{"my_input":{"scalar":{"primitive":{"floatValue":0}}}}},"executionId":{},"tasks":{"resource_type:TASK name:\"core.control_flow.conditions.double\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.double"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","double"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}},"resource_type:TASK name:\"core.control_flow.conditions.square\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.square"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","square"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}}},"node-defaults":{},"securityContext":{},"status":{"phase":0},"rawOutputDataConfig":{},"executionConfig":{"TaskPluginImpls":null,"MaxParallelism":0,"RecoveryExecution":{},"TaskResources":{"Requests":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"},"Limits":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"}},"Interruptible":null,"OverwriteCache":false,"EnvironmentVariables":null}} \ No newline at end of file +{"kind":"flyteworkflow","apiVersion":"flyte.lyft.com/v1alpha1","metadata":{"name":"name","namespace":"namespace","creationTimestamp":null,"labels":{"domain":"domain","execution-id":"name","project":"hello","shard-key":"6","workflow-name":"core-control-flow-conditions-multiplier-2"}},"spec":{"id":"::core.control_flow.conditions.multiplier_2","nodes":{"end-node":{"id":"end-node","resources":{},"kind":"end","inputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"n0":{"id":"n0","name":"fractions","resources":{},"kind":"branch","branch":{"if":{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}}}},"then":"n0-n0"},"elseIf":[{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}},"rightExpression":{"comparison":{"operator":"LTE","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":10}}}}}},"then":"n0-n1"}],"elseFail":{"failedNodeId":"fractions","message":"The input must be between 0 and 10"}},"inputBindings":[{"var":".my_input","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0":{"id":"n0-n0","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n1":{"id":"n0-n1","name":"square","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.square\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"start-node":{"id":"start-node","resources":{},"kind":"start"}},"connections":{"n0":["end-node"],"start-node":["n0"]},"edges":{"downstream":{"n0":["end-node"],"start-node":["n0"]},"upstream":{"end-node":["n0"],"n0":["start-node"],"n0-n0":["start-node"],"n0-n1":["start-node"]}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}},"outputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"inputs":{"literals":{"my_input":{"scalar":{"primitive":{"floatValue":0}}}}},"executionId":{},"tasks":{"resource_type:TASK name:\"core.control_flow.conditions.double\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.double"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","double"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}},"resource_type:TASK name:\"core.control_flow.conditions.square\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.square"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","square"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}}},"node-defaults":{},"securityContext":{},"status":{"phase":0},"rawOutputDataConfig":{},"executionConfig":{"TaskPluginImpls":null,"MaxParallelism":0,"RecoveryExecution":{},"TaskResources":{"Requests":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"},"Limits":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"}},"Interruptible":null,"OverwriteCache":false,"EnvironmentVariables":null}} \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf_crd.json b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf_crd.json index f7bd87faac..6292a3ad17 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf_crd.json +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf_crd.json @@ -1 +1 @@ -{"kind":"flyteworkflow","apiVersion":"flyte.lyft.com/v1alpha1","metadata":{"name":"name","namespace":"namespace","creationTimestamp":null,"labels":{"domain":"domain","execution-id":"name","project":"hello","shard-key":"6","workflow-name":"core-control-flow-conditions-multiplier-3"}},"spec":{"id":"::core.control_flow.conditions.multiplier_3","nodes":{"end-node":{"id":"end-node","resources":{},"kind":"end","inputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n1","var":"o0"}}}]},"n0":{"id":"n0","name":"fractions","resources":{},"kind":"branch","branch":{"if":{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}}}},"then":"n0-n0"},"elseIf":[{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":10}}}}}},"then":"n0-n1"}],"elseFail":{"failed_node_id":"fractions","message":"The input must be between 0 and 10"}},"inputBindings":[{"var":".my_input","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0":{"id":"n0-n0","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n1":{"id":"n0-n1","name":"square","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.square\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n1":{"id":"n1","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"start-node":{"id":"start-node","resources":{},"kind":"start"}},"connections":{"n0":["n1"],"n1":["end-node"],"start-node":["n0"]},"edges":{"downstream":{"n0":["n1"],"n1":["end-node"],"start-node":["n0"]},"upstream":{"end-node":["n1"],"n0":["start-node"],"n0-n0":["start-node"],"n0-n1":["start-node"],"n1":["n0"]}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}},"outputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n1","var":"o0"}}}]},"inputs":{"literals":{"my_input":{"scalar":{"primitive":{"floatValue":0}}}}},"executionId":{},"tasks":{"resource_type:TASK name:\"core.control_flow.conditions.double\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.double"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","double"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}},"resource_type:TASK name:\"core.control_flow.conditions.square\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.square"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","square"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}}},"node-defaults":{},"securityContext":{},"status":{"phase":0},"rawOutputDataConfig":{},"executionConfig":{"TaskPluginImpls":null,"MaxParallelism":0,"RecoveryExecution":{},"TaskResources":{"Requests":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"},"Limits":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"}},"Interruptible":null,"OverwriteCache":false,"EnvironmentVariables":null}} \ No newline at end of file +{"kind":"flyteworkflow","apiVersion":"flyte.lyft.com/v1alpha1","metadata":{"name":"name","namespace":"namespace","creationTimestamp":null,"labels":{"domain":"domain","execution-id":"name","project":"hello","shard-key":"6","workflow-name":"core-control-flow-conditions-multiplier-3"}},"spec":{"id":"::core.control_flow.conditions.multiplier_3","nodes":{"end-node":{"id":"end-node","resources":{},"kind":"end","inputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n1","var":"o0"}}}]},"n0":{"id":"n0","name":"fractions","resources":{},"kind":"branch","branch":{"if":{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}}}},"then":"n0-n0"},"elseIf":[{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":10}}}}}},"then":"n0-n1"}],"elseFail":{"failedNodeId":"fractions","message":"The input must be between 0 and 10"}},"inputBindings":[{"var":".my_input","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0":{"id":"n0-n0","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n1":{"id":"n0-n1","name":"square","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.square\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n1":{"id":"n1","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"start-node":{"id":"start-node","resources":{},"kind":"start"}},"connections":{"n0":["n1"],"n1":["end-node"],"start-node":["n0"]},"edges":{"downstream":{"n0":["n1"],"n1":["end-node"],"start-node":["n0"]},"upstream":{"end-node":["n1"],"n0":["start-node"],"n0-n0":["start-node"],"n0-n1":["start-node"],"n1":["n0"]}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}},"outputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n1","var":"o0"}}}]},"inputs":{"literals":{"my_input":{"scalar":{"primitive":{"floatValue":0}}}}},"executionId":{},"tasks":{"resource_type:TASK name:\"core.control_flow.conditions.double\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.double"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","double"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}},"resource_type:TASK name:\"core.control_flow.conditions.square\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.square"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","square"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}}},"node-defaults":{},"securityContext":{},"status":{"phase":0},"rawOutputDataConfig":{},"executionConfig":{"TaskPluginImpls":null,"MaxParallelism":0,"RecoveryExecution":{},"TaskResources":{"Requests":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"},"Limits":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"}},"Interruptible":null,"OverwriteCache":false,"EnvironmentVariables":null}} \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf_crd.json b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf_crd.json index cad6c84059..7ebecf9329 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf_crd.json +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf_crd.json @@ -1 +1 @@ -{"kind":"flyteworkflow","apiVersion":"flyte.lyft.com/v1alpha1","metadata":{"name":"name","namespace":"namespace","creationTimestamp":null,"labels":{"domain":"domain","execution-id":"name","project":"hello","shard-key":"6","workflow-name":"core-control-flow-conditions-nested-conditions"}},"spec":{"id":"::core.control_flow.conditions.nested_conditions","nodes":{"end-node":{"id":"end-node","resources":{},"kind":"end","inputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"n0":{"id":"n0","name":"fractions","resources":{},"kind":"branch","branch":{"if":{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}}}},"then":"n0-n0"},"elseIf":[{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":10}}}}}},"then":"n0-n1"}],"else":"n0-n2"},"inputBindings":[{"var":".my_input","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0":{"id":"n0-n0","name":"inner_fractions","resources":{},"kind":"branch","branch":{"if":{"condition":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.5}}}},"then":"n0-n0-n0"},"elseIf":[{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.5}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.7}}}}}},"then":"n0-n0-n1"}],"elseFail":{"failed_node_id":"inner_fractions","message":"Only \u003c0.7 allowed"}},"inputBindings":[{"var":".my_input","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0-n0":{"id":"n0-n0-n0","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0-n1":{"id":"n0-n0-n1","name":"square","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.square\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n1":{"id":"n0-n1","name":"square","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.square\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n2":{"id":"n0-n2","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"start-node":{"id":"start-node","resources":{},"kind":"start"}},"connections":{"n0":["end-node"],"start-node":["n0"]},"edges":{"downstream":{"n0":["end-node"],"start-node":["n0"]},"upstream":{"end-node":["n0"],"n0":["start-node"],"n0-n0":["start-node"],"n0-n0-n0":["start-node"],"n0-n0-n1":["start-node"],"n0-n1":["start-node"],"n0-n2":["start-node"]}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}},"outputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"inputs":{"literals":{"my_input":{"scalar":{"primitive":{"floatValue":0}}}}},"executionId":{},"tasks":{"resource_type:TASK name:\"core.control_flow.conditions.double\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.double"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","double"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}},"resource_type:TASK name:\"core.control_flow.conditions.square\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.square"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","square"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}}},"node-defaults":{},"securityContext":{},"status":{"phase":0},"rawOutputDataConfig":{},"executionConfig":{"TaskPluginImpls":null,"MaxParallelism":0,"RecoveryExecution":{},"TaskResources":{"Requests":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"},"Limits":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"}},"Interruptible":null,"OverwriteCache":false,"EnvironmentVariables":null}} \ No newline at end of file +{"kind":"flyteworkflow","apiVersion":"flyte.lyft.com/v1alpha1","metadata":{"name":"name","namespace":"namespace","creationTimestamp":null,"labels":{"domain":"domain","execution-id":"name","project":"hello","shard-key":"6","workflow-name":"core-control-flow-conditions-nested-conditions"}},"spec":{"id":"::core.control_flow.conditions.nested_conditions","nodes":{"end-node":{"id":"end-node","resources":{},"kind":"end","inputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"n0":{"id":"n0","name":"fractions","resources":{},"kind":"branch","branch":{"if":{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}}}},"then":"n0-n0"},"elseIf":[{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":1}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":10}}}}}},"then":"n0-n1"}],"else":"n0-n2"},"inputBindings":[{"var":".my_input","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0":{"id":"n0-n0","name":"inner_fractions","resources":{},"kind":"branch","branch":{"if":{"condition":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.5}}}},"then":"n0-n0-n0"},"elseIf":[{"condition":{"conjunction":{"leftExpression":{"comparison":{"operator":"GT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.5}}}},"rightExpression":{"comparison":{"operator":"LT","leftValue":{"var":".my_input"},"rightValue":{"primitive":{"floatValue":0.7}}}}}},"then":"n0-n0-n1"}],"elseFail":{"failedNodeId":"inner_fractions","message":"Only \u003c0.7 allowed"}},"inputBindings":[{"var":".my_input","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0-n0":{"id":"n0-n0-n0","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n0-n1":{"id":"n0-n0-n1","name":"square","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.square\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n1":{"id":"n0-n1","name":"square","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.square\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"n0-n2":{"id":"n0-n2","name":"double","resources":{},"kind":"task","task":"resource_type:TASK name:\"core.control_flow.conditions.double\"","inputBindings":[{"var":"n","binding":{"promise":{"nodeId":"start-node","var":"my_input"}}}]},"start-node":{"id":"start-node","resources":{},"kind":"start"}},"connections":{"n0":["end-node"],"start-node":["n0"]},"edges":{"downstream":{"n0":["end-node"],"start-node":["n0"]},"upstream":{"end-node":["n0"],"n0":["start-node"],"n0-n0":["start-node"],"n0-n0-n0":["start-node"],"n0-n0-n1":["start-node"],"n0-n1":["start-node"],"n0-n2":["start-node"]}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}},"outputBindings":[{"var":"o0","binding":{"promise":{"nodeId":"n0","var":"o0"}}}]},"inputs":{"literals":{"my_input":{"scalar":{"primitive":{"floatValue":0}}}}},"executionId":{},"tasks":{"resource_type:TASK name:\"core.control_flow.conditions.double\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.double"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","double"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}},"resource_type:TASK name:\"core.control_flow.conditions.square\"":{"id":{"resourceType":"TASK","name":"core.control_flow.conditions.square"},"type":"python-task","metadata":{"runtime":{"type":"FLYTE_SDK","version":"0.32.6","flavor":"python"},"retries":{}},"interface":{"inputs":{"variables":{"n":{"type":{"simple":"FLOAT"}}}},"outputs":{"variables":{"o0":{"type":{"simple":"FLOAT"}}}}},"container":{"image":"ghcr.io/flyteorg/flytecookbook:core-8b8e1a849c9adfca88049a074b10dad278f70077","args":["pyflyte-execute","--inputs","{{.input}}","--output-prefix","{{.outputPrefix}}","--raw-output-data-prefix","{{.rawOutputDataPrefix}}","--checkpoint-path","{{.checkpointOutputPrefix}}","--prev-checkpoint","{{.prevCheckpointPrefix}}","--resolver","flytekit.core.python_auto_container.default_task_resolver","--","task-module","core.control_flow.conditions","task-name","square"],"resources":{},"config":[{"key":"testKey1","value":"testValue1"},{"key":"testKey2","value":"testValue2"},{"key":"testKey3","value":"testValue3"}]}}},"node-defaults":{},"securityContext":{},"status":{"phase":0},"rawOutputDataConfig":{},"executionConfig":{"TaskPluginImpls":null,"MaxParallelism":0,"RecoveryExecution":{},"TaskResources":{"Requests":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"},"Limits":{"CPU":"0","Memory":"0","EphemeralStorage":"0","Storage":"0","GPU":"0"}},"Interruptible":null,"OverwriteCache":false,"EnvironmentVariables":null}} \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/transformers/k8s/node.go b/flytepropeller/pkg/compiler/transformers/k8s/node.go index 8a4c9248ec..06038d1039 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/node.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/node.go @@ -241,7 +241,10 @@ func buildBranchNodeSpec(branch *core.BranchNode, tasks []*core.CompiledTask, er childNodes = append(childNodes, ns...) res.Else = refStr(branch.IfElse.GetElseNode().Id) case *core.IfElseBlock_Error: - res.ElseFail = branch.IfElse.GetError() + coreError := branch.IfElse.GetError() + res.ElseFail = &v1alpha1.Error{ + Error: coreError, + } } other := make([]*v1alpha1.IfBlock, 0, len(branch.IfElse.Other)) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 0f9e95f19b..7515156be4 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -322,8 +322,8 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu subNodeStatus := nodeExecutionRequest.subNodeStatus // capture subNode error if exists - if nodeExecutionRequest.subNodeStatus.Error != nil { - subNodeFailureCollector.Collect(index, subNodeStatus.Error.Message) + if nodeExecutionRequest.subNodeStatus.Error != nil && nodeExecutionRequest.subNodeStatus.Error.ExecutionError != nil { + subNodeFailureCollector.Collect(index, subNodeStatus.Error.ExecutionError.Message) } // process events by copying from internal event recorder diff --git a/flytepropeller/pkg/controller/nodes/branch/evaluator_test.go b/flytepropeller/pkg/controller/nodes/branch/evaluator_test.go index dae8a1337b..45bb850358 100644 --- a/flytepropeller/pkg/controller/nodes/branch/evaluator_test.go +++ b/flytepropeller/pkg/controller/nodes/branch/evaluator_test.go @@ -672,8 +672,10 @@ func TestDecideBranch(t *testing.T) { ThenNode: &n2, }, }, - ElseFail: &core.Error{ - Message: userError, + ElseFail: &v1alpha1.Error{ + Error: &core.Error{ + Message: userError, + }, }, } diff --git a/flytestdlib/storage/mocks/reference_constructor.go b/flytestdlib/storage/mocks/reference_constructor.go index 85619d505e..52b526338a 100644 --- a/flytestdlib/storage/mocks/reference_constructor.go +++ b/flytestdlib/storage/mocks/reference_constructor.go @@ -59,3 +59,37 @@ func (_m *ReferenceConstructor) ConstructReference(ctx context.Context, referenc return r0, r1 } + +type ReferenceConstructor_DeepCopyReferenceConstructor struct { + *mock.Call +} + +func (_m ReferenceConstructor_DeepCopyReferenceConstructor) Return(_a0 storage.ReferenceConstructor) *ReferenceConstructor_DeepCopyReferenceConstructor { + return &ReferenceConstructor_DeepCopyReferenceConstructor{Call: _m.Call.Return(_a0)} +} + +func (_m *ReferenceConstructor) OnDeepCopyReferenceConstructor() *ReferenceConstructor_DeepCopyReferenceConstructor { + c_call := _m.On("DeepCopyReferenceConstructor") + return &ReferenceConstructor_DeepCopyReferenceConstructor{Call: c_call} +} + +func (_m *ReferenceConstructor) OnDeepCopyReferenceConstructorMatch(matchers ...interface{}) *ReferenceConstructor_DeepCopyReferenceConstructor { + c_call := _m.On("DeepCopyReferenceConstructor", matchers...) + return &ReferenceConstructor_DeepCopyReferenceConstructor{Call: c_call} +} + +// DeepCopyReferenceConstructor provides a mock function with given fields: +func (_m *ReferenceConstructor) DeepCopyReferenceConstructor() storage.ReferenceConstructor { + ret := _m.Called() + + var r0 storage.ReferenceConstructor + if rf, ok := ret.Get(0).(func() storage.ReferenceConstructor); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.ReferenceConstructor) + } + } + + return r0 +} diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index 3e84cb7acb..cb4db553e1 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -98,6 +98,10 @@ type ReferenceConstructor interface { // ConstructReference creates a new dataReference that matches the storage structure. ConstructReference(ctx context.Context, reference DataReference, nestedKeys ...string) (DataReference, error) + // Explain that in order for this interface to be part of a CRD we need to implement a DeepCopy method as per + // https://pkg.go.dev/k8s.io/gengo/examples/deepcopy-gen. + DeepCopyReferenceConstructor() ReferenceConstructor + // FromSignedURL constructs a data reference from a signed URL //FromSignedURL(ctx context.Context, signedURL string) (DataReference, error) } diff --git a/flytestdlib/storage/url_path.go b/flytestdlib/storage/url_path.go index 3aef478084..66a58fae8b 100644 --- a/flytestdlib/storage/url_path.go +++ b/flytestdlib/storage/url_path.go @@ -47,6 +47,10 @@ func (URLPathConstructor) ConstructReference(ctx context.Context, reference Data return DataReference(u.String()), nil } +func (URLPathConstructor) DeepCopyReferenceConstructor() ReferenceConstructor { + return NewURLPathConstructor() +} + func NewURLPathConstructor() URLPathConstructor { return URLPathConstructor{} }